Broadcast variables and broadcast joins in Apache Spark
A broadcast variable is an Apache Spark feature that lets us send a read-only copy of a variable to every worker node in the Spark cluster. The broadcast variables are useful only when we want to reuse the same variable across multiple stages of the Spark job, but the feature allows us to speed up joins too.
In this article, we will take a look at the broadcast variables and check how we can use them to perform the broadcast join.
A broadcast variable is a wrapper provided by the SparkContext that serializes the data, sends it to every worker node, and reuses the variable in every task that needs it. It is crucial to remember that the value is read-only, and we cannot change it after creating the broadcast variable.
A common mistake is to believe that we should turn every variable into a broadcast variable. That does not make sense, because Spark is going to serialize and send to the workers every variable we use anyway (as long as it is serializable), even if it is not a broadcast variable. The only benefit of using broadcast variables is the ability to reuse it across multiple stages.
Suppose we use an operation that causes a reshuffle (for example, join, groupby, or repartition), and we use the variable before and after that operation. In that case, it is a good idea to turn it into a broadcast variable. On the other hand, if we use the variable only once, we should avoid using the broadcast variable feature because it adds clutter to the code with no real benefit.
One particular use case of broadcast variables may be beneficial even if we use the variable only once. When we join a huge DataFrame with a relatively tiny DataFrame (a config lookup table, dimension table in a data warehouse, or something similar in size), we can speed up the join by using the broadcast join.
When we do it, Spark will send the full copy of the broadcasted DataFrame to every worker node, and it will not waste time reshuffling both DataFrames to partition them by the join key.
To use the broadcast join feature, we have to wrap the broadcasted DataFrame using the broadcast function:
1 2 3 4 5 6 from pyspark.sql.functions import broadcast data_frame.join( broadcast(lookup_data_frame), lookup_data_frame.key_column==data_frame.key_column )
Automatically Using the Broadcast Join
Broadcast join looks like such a trivial and low-level optimization that we may expect that Spark should automatically use it even if we don’t explicitly instruct it to do so. This optimization is controlled by the
spark.sql.autoBroadcastJoinThreshold configuration parameter, which default value is 10 MB.
According to the documentation:
spark.sql.autoBroadcastJoinThreshold configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.
Note that this feature works only in the case of data sources that provide statistics: when we are reading directly from files and in the case of Hive Metastore tables where the command
ANALYZE TABLE <tableName> COMPUTE STATISTICS noscanhas been run.
We can disable the auto broadcast feature by setting its configuration parameter to -1.
You may also like
- How to speed up a PySpark job
- How to run PySpark code using the Airflow SSHOperator
- What is the difference between cache and persist in Apache Spark?
- What is the difference between CUBE and ROLLUP and how to use it in Apache Spark?
- What is the difference between repartition and coalesce in Apache Spark?