How to save an Apache Spark DataFrame as a dynamically partitioned table in Hive

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (12/100)

In this article, I will show how to save a Spark DataFrame as a dynamically partitioned Hive table. The underlying files will be stored in S3. I will assume that we are using AWS EMR, so everything works out of the box, and we don’t have to configure S3 access and the usage of AWS Glue Data Catalog as the Hive Metastore.

saveAsTable and insertInto

The first thing, we have to do is creating a SparkSession with Hive support and setting the partition overwrite mode configuration parameter to dynamic:

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
spark.sql('set spark.sql.sources.partitionOverwriteMode=dynamic')

Suppose that we have to store a DataFrame df partitioned by the date column and that the Hive table does not exist yet. In this case, we have to partition the DataFrame, specify the schema and table name to be created, and give Spark the S3 location where it should store the files:

s3_location = 's3://some-bucket/path'
df.partitionBy('date') \
            .saveAsTable('schema_name.table_name', path=s3_location)

If the table already exists, we must use the insertInto function instead of the saveAsTable. However, first, we must check whether the table exist. The easiest way to do it is to use the show tables statement:

table_exist = spark.sql('show tables in ' + database).where(col('tableName') == table).count() == 1

When we use insertInto we no longer need to explicitly partition the DataFrame (after all, the information about data partitioning is in the Hive Metastore, and Spark can access it without our help):

df.write.insertInto("schema_name.table_name", overwrite=True)

In the above example, I decided to overwrite the existing partition. We can also append the DataFrame content to the existing partition.

Writing directly to S3

The other way to store data in a partitioned S3 structure is to write directly to the S3 location and refresh the partitions of the Athena table:

s3_location = 's3://some-bucket/path/year=2020/month=10/day=04'

df.repartition(1).write.parquet(s3_location, mode='overwrite')
spark.sql('msck repair table schema_name.table_name')

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.

Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group