Broadcast variables and broadcast joins in Apache Spark

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (6/100)

A broadcast variable is an Apache Spark feature that lets us send a read-only copy of a variable to every worker node in the Spark cluster. The broadcast variables are useful only when we want to reuse the same variable across multiple stages of the Spark job, but the feature allows us to speed up joins too.

In this article, we will take a look at the broadcast variables and check how we can use them to perform the broadcast join.

Broadcast Variables

A broadcast variable is a wrapper provided by the SparkContext that serializes the data, sends it to every worker node, and reuses the variable in every task that needs it. It is crucial to remember that the value is read-only, and we cannot change it after creating the broadcast variable.

A common mistake is to believe that we should turn every variable into a broadcast variable. That does not make sense, because Spark is going to serialize and send to the workers every variable we use anyway (as long as it is serializable), even if it is not a broadcast variable. The only benefit of using broadcast variables is the ability to reuse it across multiple stages.

Suppose we use an operation that causes a reshuffle (for example, join, groupby, or repartition), and we use the variable before and after that operation. In that case, it is a good idea to turn it into a broadcast variable. On the other hand, if we use the variable only once, we should avoid using the broadcast variable feature because it adds clutter to the code with no real benefit.

Broadcast Join

One particular use case of broadcast variables may be beneficial even if we use the variable only once. When we join a huge DataFrame with a relatively tiny DataFrame (a config lookup table, dimension table in a data warehouse, or something similar in size), we can speed up the join by using the broadcast join.

When we do it, Spark will send the full copy of the broadcasted DataFrame to every worker node, and it will not waste time reshuffling both DataFrames to partition them by the join key.

To use the broadcast join feature, we have to wrap the broadcasted DataFrame using the broadcast function:

1
2
3
4
5
6
from pyspark.sql.functions import broadcast

data_frame.join(
    broadcast(lookup_data_frame),
    lookup_data_frame.key_column==data_frame.key_column
)


Automatically Using the Broadcast Join

Broadcast join looks like such a trivial and low-level optimization that we may expect that Spark should automatically use it even if we don’t explicitly instruct it to do so. This optimization is controlled by the spark.sql.autoBroadcastJoinThreshold configuration parameter, which default value is 10 MB.

According to the documentation:

spark.sql.autoBroadcastJoinThreshold configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.

Note that this feature works only in the case of data sources that provide statistics: when we are reading directly from files and in the case of Hive Metastore tables where the commandANALYZE TABLE <tableName> COMPUTE STATISTICS noscanhas been run.

We can disable the auto broadcast feature by setting its configuration parameter to -1.

Sources

  • https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#broadcast-variables
  • https://mungingdata.com/apache-spark/broadcast-joins/
  • https://mungingdata.com/apache-spark/broadcasting-maps-in-spark/

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group