How to save an Apache Spark DataFrame as a dynamically partitioned table in Hive

In this article, I will show how to save a Spark DataFrame as a dynamically partitioned Hive table. The underlying files will be stored in S3. I will assume that we are using AWS EMR, so everything works out of the box, and we don’t have to configure S3 access and the usage of AWS Glue Data Catalog as the Hive Metastore.

saveAsTable and insertInto

The first thing, we have to do is creating a SparkSession with Hive support and setting the partition overwrite mode configuration parameter to dynamic:

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql('set spark.sql.sources.partitionOverwriteMode=dynamic')

Suppose that we have to store a DataFrame df partitioned by the date column and that the Hive table does not exist yet. In this case, we have to partition the DataFrame, specify the schema and table name to be created, and give Spark the S3 location where it should store the files:

s3_location = 's3://some-bucket/path'
df.partitionBy('date') \
            .saveAsTable('schema_name.table_name', path=s3_location)

If the table already exists, we must use the insertInto function instead of the saveAsTable. However, first, we must check whether the table exist. The easiest way to do it is to use the show tables statement:

table_exist = spark.sql('show tables in ' + database).where(col('tableName') == table).count() == 1

When we use insertInto we no longer need to explicitly partition the DataFrame (after all, the information about data partitioning is in the Hive Metastore, and Spark can access it without our help):

df.write.insertInto("schema_name.table_name", overwrite=True)

In the above example, I decided to overwrite the existing partition. We can also append the DataFrame content to the existing partition.

Writing directly to S3

The other way to store data in a partitioned S3 structure is to write directly to the S3 location and refresh the partitions of the Athena table:

s3_location = 's3://some-bucket/path/year=2020/month=10/day=04'

df.repartition(1).write.parquet(s3_location, mode='overwrite')

spark.sql('msck repair table schema_name.table_name')
Older post

When to cache an Apache Spark DataFrame?

Should we cache everything in Apache Spark, or are there any rules?

Newer post

Working with dates and time in Apache Spark

How to get relative dates (yesterday, tomorrow) in Apache Spark, and how to calculate the difference between two dates