How to save an Apache Spark DataFrame as a dynamically partitioned table in Hive
In this article, I will show how to save a Spark DataFrame as a dynamically partitioned Hive table. The underlying files will be stored in S3. I will assume that we are using AWS EMR, so everything works out of the box, and we don’t have to configure S3 access and the usage of AWS Glue Data Catalog as the Hive Metastore.
saveAsTable and insertInto
The first thing, we have to do is creating a SparkSession with Hive support and setting the partition overwrite mode configuration parameter to dynamic:
1 2 spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark.sql('set spark.sql.sources.partitionOverwriteMode=dynamic')
Suppose that we have to store a DataFrame
df partitioned by the
date column and that the Hive table does not exist yet. In this case, we have to partition the DataFrame, specify the schema and table name to be created, and give Spark the S3 location where it should store the files:
1 2 3 s3_location = 's3://some-bucket/path' df.partitionBy('date') \ .saveAsTable('schema_name.table_name', path=s3_location)
If the table already exists, we must use the
insertInto function instead of the
saveAsTable. However, first, we must check whether the table exist. The easiest way to do it is to use the
show tables statement:
1 table_exist = spark.sql('show tables in ' + database).where(col('tableName') == table).count() == 1
When we use
insertInto we no longer need to explicitly partition the DataFrame (after all, the information about data partitioning is in the Hive Metastore, and Spark can access it without our help):
1 df.write.insertInto("schema_name.table_name", overwrite=True)
In the above example, I decided to overwrite the existing partition. We can also append the DataFrame content to the existing partition.
Writing directly to S3
The other way to store data in a partitioned S3 structure is to write directly to the S3 location and refresh the partitions of the Athena table:
1 2 3 4 5 s3_location = 's3://some-bucket/path/year=2020/month=10/day=04' df.repartition(1).write.parquet(s3_location, mode='overwrite') spark.sql('msck repair table schema_name.table_name')
You may also like
- What is the difference between cache and persist in Apache Spark?
- How to configure Spark to maximize resource usage while using AWS EMR
- What is the difference between a transformation and an action in Apache Spark?
- PySpark-Check - data quality validation for PySpark 3.0.0
- How to use the window function to get a single row from each group in Apache Spark