If you work with Apache Spark, shuffling is probably the leading cause of your problems.

Have you ever seen a computation failure because the data doesn’t fit on a single worker node? Do you wonder why Apache Spark couldn’t use multiple nodes in such cases?

Does your cluster spend most of the computing time sending data between worker nodes, and all your queries are painfully slow?

Every time something bad happens in Spark, blaming shuffling is a good guess, even if you don’t know what happens. What is shuffling, though? And why do we need it?

What is shuffling?

Apache Spark processes queries by distributing data over multiple nodes and calculating the values separately on every node. However, occasionally, the nodes need to exchange the data. After all, that’s the purpose of Spark - processing data that doesn’t fit on a single machine.

Shuffling is the process of exchanging data between partitions. As a result, data rows can move between worker nodes when their source partition and the target partition reside on a different machine.

Spark doesn’t move data between nodes randomly. Shuffling is a time-consuming operation, so it happens only when there is no other option.

When do we need shuffling?

Spark nodes read chunks of the data (data partitions), but they don’t send the data between each other unless they need to. When do they do it? When you explicitly request data repartitioning (using the repartition functions), when you join DataFrames or group data by a column value.

In the case of repartitioning, we shouldn’t be surprised. We requested repartitioning, so Spark does what we wanted. What about joining and grouping?

Imagine you have to pack 150 birthday presents. Chill out ;) You aren’t doing it alone. Two other people work with you. You’re preparing lousy gifts: a pair of socks and a scented candle, but the recipient could pick the scent! Who wouldn’t like to choose what crappy gift they get? ;)

Packing the socks is relatively easy. You split the gift bags among the team, and every person puts 50 pairs of socks into the bags. That’s like being a Spark node without the shuffle operation. You do your part of the work and don’t care about the other workers.

However, the job gets tougher as soon as your actions depend on the data you see. While packing the candles, you have to see who gets the present, find the person’s favorite scent on the list, pick the candle from the correct box (or bring the package from the storage room), and put the candle in the right bag.

But we can make it easier! Before we even start, everyone groups their bags by the desired candle scent. You end up with nine piles of bags (3 piles per person). Now, every one of you picks one scent. You got lavender. You have to gather all bags of people who want a lavender candle and bring them to your part of the room. Your colleagues do the same. As a result, every worker has the data they need.

You have just completed the first shuffle operation.

Now, you can go to the storage room. In the storage room, you do the second part of the shuffling. After all, you need only the boxes with lavender candles. You search for the right boxes and bring them to the packing room. Now, you can put the candles into the bags you prepared earlier. Congratulations. You have joined two “datasets.” ;)

Spark isn’t as clever as a human, so it needs a mathematical operation to determine which data belongs to a worker. When we join the data in Spark, it needs to put the data in both DataFrames in buckets. Those buckets are calculated by hashing the partitioning key (the column(s) we use for joining) and splitting the data into a predefined number of buckets.

We can control the number of buckets using the spark.sql.shuffle.partitions parameter.

The same hashing and partitioning happen in both datasets we join. The corresponding partitions from both datasets are transferred to the same worker node. The goal is to have the data locally on the same worker before we start joining the values. We did the same when we sorted the gift bags and brought the right boxes from the storage room in our present packing example.

Spark partitions the data also when we run a grouping operation and calculate an aggregate. This time we have only one dataset, but we still need the data that belongs to a single group on a single worker node. Otherwise, we couldn’t calculate the aggregations.

Of course, some aggregations, like calculating the number of elements in a group or a sum of the parts, don’t require moving data to a single node. If we calculate the sum on every node separately and then move the results to a single node, we can calculate the final sum.

However, query optimization by calculating partial aggregations isn’t always possible. Therefore, in general, grouping requires the shuffle operation.

What’s the most common problem with shuffling in Apache Spark?

What if one worker node receives more data than any other worker? You will have to wait for that worker to finish processing while others do nothing.

While packing birthday presents, the other two people could help you if it turned out that 120 people want a lavender candle and you are the one doing most of the work. It’s natural for humans, but Apache Spark node workers can’t coordinate and share parts of a single partition. The unlucky worker node has to finish the task on its own while the others wait.

What’s even worse, the worker may fail, and the cluster will need to repeat a part of the computation. You can avoid such problems by enabling speculative execution - use the spark.speculation parameter. With this feature enabled, the idle workers calculate a copy of long-running tasks, and the cluster uses the results produced by the worker who finished sooner.

When does one worker receive more data than others?

It happens when one value dominates the partitioning key (for example, the null). All rows with the same partitioning key value must be processed by the same worker node (in the case of partitioning). So if we have 70% of null values in the partitioning key, one node will get at least 70% of the rows.

When this happens, we have two options.

First, we can isolate the dominating value by filtering it out from the DataFrame. After filtering, we can calculate the aggregate using the remaining rows. After that, we can calculate the aggregate of the filtered out value separately. It may help because we should have more resources available while calculating the aggregate of a smaller dataset.

We can also redistribute the values of the filtered out dataset using a different partitioning key, calculate a partial aggregate, and then combine them to get the final result.

The second solution is to create a surrogate partitioning key by combining multiple columns or generating an artificial partitioning value. Such a key should help us uniformly distribute the work across all worker nodes.

Of course, sometimes, we can’t split the work into partial aggregates or use surrogate keys because we would get an invalid aggregate value. We need to deploy a Spark cluster with more powerful nodes for such situations.

Can’t we do better?

The simplicity of the partitioning algorithm causes all of the problems.

We split the data once before calculations. Every worker gets an entire partition (sometimes multiple partitions), and they don’t share the data until everyone finishes their task.

Wouldn’t it be better if nodes coordinated the work and helped the worker who got stuck with the largest partition? I don’t think so. In such a case, we would complain about an inefficient coordination algorithm. The process would resemble pushing papers between corporate departments. Can you imagine doing it efficiently?

Older post

What is the root cause of problems in software engineering?

What is the primary, unrepairable cause of almost all bugs, data leaks, human problems, etc.?

Newer post

Python decorators explained

How can we define a Python decorator, and when should we use Python decorators.