Skip to main content

Command Palette

Search for a command to run...

Broadcast join & Spark Performance Optimisation

Published
2 min read
P

I am a data engineer at Tesco and this blog is part of a mentoring process to track the progress of my career development journey.

  1. Advantages of Broadcast Hash Join Reduced Shuffling:

    Broadcast Hash Join minimizes data shuffling by broadcasting the smaller DataFrame to all worker nodes, which significantly reduces network I/O and speeds up the join process1. Efficiency with Small Tables: It is particularly efficient when joining a large table with a smaller one, as the smaller table can be broadcasted to all nodes2. In-Memory Computation: Since the smaller DataFrame is kept in memory on each executor, the join operations are faster compared to disk-based operations2. Handles Data Skew: It naturally handles data skew as there is minimal shuffling involved1

  2. Disadvantages of Broadcast Hash Join Memory Constraints:

    The smaller DataFrame must fit into the memory of each executor. If it doesn't, you may encounter out-of-memory (OOM) errors1. Not Suitable for Large Tables: It cannot be used when both DataFrames are large, as broadcasting a large DataFrame would be inefficient and could lead to performance issues1. Network Intensive: If the broadcasted dataset is large, it can become a network-intensive operation, potentially slowing down job execution3.

  3. Can Broadcast Hash Join Cause OOM Exception?

    Broadcast Hash Join can cause an OOM exception if the broadcasted DataFrame is too large to fit into the memory of the executors.

Example: If broadcasted dataframe is larger than the available memory on the executors, attempting to broadcast it will result in an OOM error.

In this scenario, if smallDF exceeds the memory capacity of the executors, Spark will throw an OutOfMemorySparkException4.

Broadcast Hash Join can be dependent on the driver memory as well. When performing a broadcast join, the smaller DataFrame is collected into the driver’s memory before being broadcasted to all executors. This means that if the smaller DataFrame is too large to fit into the driver’s memory, it can cause an out-of-memory (OOM) error on the driver. However, the primary memory constraint for broadcast joins is typically on the executors, as they need to store the broadcasted DataFrame in memory to perform the join efficiently and have also sufficient memory for storing hash table.

  1. Ways to Ensure a Table is Considered for Broadcast Hash Join:

    Use Broadcast Hints: Explicitly mark a DataFrame as broadcastable using the broadcast function in PySpark. This overrides the default behavior and forces Spark to broadcast the DataFrame5.

from pyspark.sql.functions import broadcast
joinDF = largeDF.join(broadcast(smallDF), "id")

Adjust Configuration: Set the spark.sql.autoBroadcastJoinThreshold configuration to a higher value to increase the size limit for automatic broadcasting. For example:

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)  # 100MB