Broadcast variables and broadcast joins in Apache Spark

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (6/100)

A broadcast variable is an Apache Spark feature that lets us send a read-only copy of a variable to every worker node in the Spark cluster. The broadcast variables are useful only when we want to reuse the same variable across multiple stages of the Spark job, but the feature allows us to speed up joins too.

In this article, we will take a look at the broadcast variables and check how we can use them to perform the broadcast join.

Broadcast Variables

A broadcast variable is a wrapper provided by the SparkContext that serializes the data, sends it to every worker node, and reuses the variable in every task that needs it. It is crucial to remember that the value is read-only, and we cannot change it after creating the broadcast variable.

A common mistake is to believe that we should turn every variable into a broadcast variable. That does not make sense, because Spark is going to serialize and send to the workers every variable we use anyway (as long as it is serializable), even if it is not a broadcast variable. The only benefit of using broadcast variables is the ability to reuse it across multiple stages.

Suppose we use an operation that causes a reshuffle (for example, join, groupby, or repartition), and we use the variable before and after that operation. In that case, it is a good idea to turn it into a broadcast variable. On the other hand, if we use the variable only once, we should avoid using the broadcast variable feature because it adds clutter to the code with no real benefit.

Broadcast Join

One particular use case of broadcast variables may be beneficial even if we use the variable only once. When we join a huge DataFrame with a relatively tiny DataFrame (a config lookup table, dimension table in a data warehouse, or something similar in size), we can speed up the join by using the broadcast join.

When we do it, Spark will send the full copy of the broadcasted DataFrame to every worker node, and it will not waste time reshuffling both DataFrames to partition them by the join key.

To use the broadcast join feature, we have to wrap the broadcasted DataFrame using the broadcast function:

from pyspark.sql.functions import broadcast


Automatically Using the Broadcast Join

Broadcast join looks like such a trivial and low-level optimization that we may expect that Spark should automatically use it even if we don’t explicitly instruct it to do so. This optimization is controlled by the spark.sql.autoBroadcastJoinThreshold configuration parameter, which default value is 10 MB.

According to the documentation:

spark.sql.autoBroadcastJoinThreshold configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.

Note that this feature works only in the case of data sources that provide statistics: when we are reading directly from files and in the case of Hive Metastore tables where the commandANALYZE TABLE <tableName> COMPUTE STATISTICS noscanhas been run.

We can disable the auto broadcast feature by setting its configuration parameter to -1.



Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!


Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
  • Mastodon:

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.