Skip to main content

Command Palette

Search for a command to run...

Apache Spark Aggregation methods

Updated
5 min read
P

I am a data engineer at Tesco and this blog is part of a mentoring process to track the progress of my career development journey.

two primary methods of performing aggregations:

  • Sort-based and

  • Hash-based,

both optimized for different scenarios and have distinct performance characteristics

HashAggregateSortAggregate
Faster because it avoids sorting dataSorting data based on Group keys before aggregation
Memory constraintswhen aggregate functions or group keys not supported by Hash-based
May fall back to sort-based if the dataset is too large or has too many unique keys leading to memory pressureCan handle larger datasets
It uses off-heap memory for storing the aggregation mapIt streams data through disk and memory

HashAggregate is prefered method for aggregation in SparkSQL. This method creates a hash table where each entry corresponds to a unique group key. As Spark processes rows, it quickly uses the group key to locate the corresponding entry in the hash table and updates the aggregate values accordingly. This methos is generally faster because it avoids sorting the data before aggregation.

It requires that all intermediate aggregate values fit into memory. If the dataset is too large or there are too many unique keys, Spark might be unable to use hash-based aggregation due to memory constraints.

  1. It is preferred when the aggregate functions and group by keys are supported by the hash aggregation strategy.

  2. It can be significantly faster than sort-based aggregation because it avoids sorting data.

  3. It uses off-heap memory for storing the aggregation map.

  4. It may fall back to sort-based aggregation if the dataset is too large or has too many unique keys, leading to memory pressure.

SortAggregate is used when previous one is not feasible, either due to memory constraints or because the aggregation functions or group by keys are not supported by the hash aggregation strategy. This method involves sorting the data based on the group by keys and processing the sorted data to compute aggregate values. This method can handle larger datasets since it only requires some intermediate resilts to fit into memory. Genrelly is slower than hash-based aggregation due to the additional sorting step.

  1. It is used when hash-based aggregation is not feasible due to memory constraints or unsupported aggregation functions or group by keys.

  2. It involves sorting the data based on the group by keys before performing the aggregation.

  3. It can handle larger datasets since it streams data through disk and memory.

Detail explanation for Hash-based aggregation

- Initialization: When a query that requires aggregation is executed, Spark determines whether it can use hash-based aggregation. This decision is based on factors such as the types of aggregation functions (e.g., sum, avg, min, max, count), the data types of the columns involved, and whether the dataset is expected to fit into memory.

- Partial Aggregation (Map Side): The aggregation process begins with a “map-side” partial aggregation. For each partition of the input data, Spark creates an in-memory hash map where each entry corresponds to a unique group key. As rows are processed, Spark updates the aggregation buffer for each group key directly in the hash map. This step produces partial aggregate results for each partition.

- Shuffling: After the partial aggregation, Spark shuffles the data by the grouping keys, so that all records belonging to the same group are moved to the same partition. This step is necessary to ensure that the final aggregation produces accurate results across the entire dataset.

- Final Aggregation (Reduce Side): Once the shuffled data is partitioned, Spark performs the final aggregation. It again uses a hash map to aggregate the partially aggregated results. This step combines the partial results from different partitions to produce the final aggregate value for each group.

- Spill to Disk: If the dataset is too large to fit into memory, Spark’s hash-based aggregation can spill data to disk. This mechanism ensures that Spark can handle datasets larger than the available memory by using external storage.

- Fallback to Sort-based Aggregation: In cases where the hash map becomes too large or if there are memory issues, Spark can fall back to sort-based aggregation. This decision is made dynamically based on runtime conditions and memory availability.

- Output: The final output of the HashAggregateExec operator is a new dataset where each row represents a group along with its aggregated value(s).

The efficiency of hash-based aggregation comes from its ability to perform in-place updates to the aggregation buffer and its avoidance of sorting the data. However, its effectiveness is limited by the available memory and the nature of the dataset. For datasets that do not fit well into memory or when dealing with complex aggregation functions that are not supported by hash-based aggregation, Spark might opt for sort-based aggregation instead.

Detailed Explanation of Sort-based Aggregation

- Shuffling: The data is partitioned across the cluster based on the grouping keys. This step ensures that all records with the same key end up in the same partition.

- Sorting: Within each partition, the data is sorted by the grouping keys. This is necessary because the aggregation will be performed on groups of data with the same key, and having the data sorted ensures that all records for a given key are contiguous.

- Aggregation: Once the data is sorted, Spark can perform the aggregation. For each partition, Spark uses a SortBasedAggregationIterator to iterate over the sorted records. This iterator maintains a buffer row to cache the aggregated values for the current group.

- Processing Rows: As the iterator goes through the rows, it processes them one by one, updating the buffer with the aggregate values. When the end of a group is reached (i.e., the next row has a different grouping key), the iterator outputs a row with the final aggregate value for that group and resets the buffer for the next group.

- Memory Management: Unlike hash-based aggregation, which requires a hash map to hold all group keys and their corresponding aggregate values, sort-based aggregation only needs to maintain the aggregate buffer for the current group. This means that sort-based aggregation can handle larger datasets that might not fit entirely in memory.

- Fallback Mechanism: Although not part of the normal operation, it’s worth noting that Spark’s HashAggregateExec can theoretically fall back to sort-based aggregation if it encounters memory issues during hash-based processing.

The sort-based aggregation process is less efficient than hash-based aggregation because it involves the extra step of sorting the data, which is computationally expensive. However, it is more scalable for large datasets or when dealing with immutable types in the aggregation columns that prevent the use of hash-based aggregation.

Agg1

Agg2

More from this blog

Peter's blog

11 posts