If we are doing bulky join and writing as Parquet, below is the screenshot of the failure Task.
Output - The estimated amount of records for this task after Join operation in Size and record count.
Shuffle Read - This can be considered as input for this job as it represents the mount of data involved or considered as input for this Join.
Shuffle Spill(Memory) - The amount of RAM consumed for this operation.
Shuffle Spill (Disk) - This indicates the amount of data written to Disk for this operation. Typically, this is not an ideal case to spill records to Disk and indicates that this stage is processing more data than it's capacity and have high chances of failure. While processing, as the data is more than the capacity of fit into RAM for that container, so it is written to Disk in compressed format.
Below is the ideal case:
Solution 1:
set spark.sql.shuffle.partitions
eg:
spark.conf.set("spark.sql.shuffle.partitions", 200)
Output - The estimated amount of records for this task after Join operation in Size and record count.
Shuffle Read - This can be considered as input for this job as it represents the mount of data involved or considered as input for this Join.
Shuffle Spill(Memory) - The amount of RAM consumed for this operation.
Shuffle Spill (Disk) - This indicates the amount of data written to Disk for this operation. Typically, this is not an ideal case to spill records to Disk and indicates that this stage is processing more data than it's capacity and have high chances of failure. While processing, as the data is more than the capacity of fit into RAM for that container, so it is written to Disk in compressed format.
Below is the ideal case:
Solution 1:
set spark.sql.shuffle.partitions
eg:
spark.conf.set("spark.sql.shuffle.partitions", 200)
The parameter that controls the parallelism during JOIN or Aggregrate operation that results from a shuffle is a parameter called spark.sql.shuffle.partitions. The reason why the default is 200 is from real-world experience that was found to be a very good default. But in practice, that value is usually always bad.
When dealing with small amounts of data, you should usually reduce the number of shuffle partitions otherwise you will end up with many partitions with small numbers of entries in each partition, which results in underutilization of all executors and increases the time it takes for data to be transferred over the network from the executor to the executor.
On the other hand, when you have too much data and too few partitions, it causes fewer tasks to be processed in executors, but it increases the load on each individual executor and often leads to memory errors. Also, if you increase the size of the partition larger than the available memory in the executor, you will get disk spills. Spills are the slowest thing you can probably be able to do. Essentially, during disk spills Spark operations place part of its RAM into a disk if it does not fit in memory, allowing Spark job to run well on any sized data. Even though it won't break your Pipeline it makes it super inefficient because of the additional overhead of disk I/O and increased garbage collection.
Therefore spark.sql.shuffle.partitions is one of the most frequently configured parameters when working with Spark.
Solution 2: Divide and conquer Approach
This is typically needed when the data being handled is more than the allocated Queue size. This is a case which I faced in one of my places those I worked, where I had to process and ingest huge sets of data but the Queue size allocated for my team is less than the data.
Not to worry in such cases we can divide the data into parts and then insert.
Eg: A case where we read data from 25 tables(either hive or RDBMS) , Join all these and write the result to MongoDB and consider the record count is 500 Million. Max memory limit per executor in the Queue is 15 GB and this job fails at join phase.
So, divide the data to 10 equal parts and perform Joins on .5 Million records and ingest to MongoDB. Below are the steps to do this:
1. Create a temp table with only the primary keys and sort the keys in descending order.
2. Get Min and Max of the Primary keys and calculate the incrementaloffset by dividing it with the numParts which is 10 in this case.
Eg:
val numParts = 10
val incrementalOffSet = (maxPrimaryKeyVal - minPrimaryKeyValue)/numParts
Now incrementalOffSet = ~.5M
3. Do a broadcast Join between this temp table of parted data with the main table. On doing this the main table's 0.5 Million records only will be involved in the transformation.
Reason of doing Broadcast Join is that the temp table will have only the primary keys and for millions of records it's size would be in MB's only and is safe to do it.
In case, if the primary key is not numeric(happens in case of hive tables), can pick any other Unique key or in the worst case create a Row_Number on the sorted records. All the need is to have a sorted set of dividable rows.
Below is the sample code and can be extended,
In this way, write a window and loop through the splitted data and complete the operation. This approach is helpful where the transformation logic cannot be splitted and data can be splitted.
Another approach where the number of tables being Joined can be splitted into parts.
Eg: In this case we have 25 tables. so perform 5 tables join each time.
Solution 3:
RepartitionByRange
Consider a case where the data is Skewed which means if we have a record set of 10,000 records and is stored in 3 partitions and partition 1 has 7,000 records and the other 2 partitions has the remaining 3000 records. Or typically the data is not evenly distributed among the partitions.
To, distribute data evenly among all the partitions, use repartitionByRange API. This function takes 2 input parameters numberOf Partitions and ColumnName(s) on which the data has to be divided.
Eg: repartitionByRange ( 5, col("AccountId"))
for example, please refer https://www.waitingforcode.com/apache-spark-sql/range-partitioning-apache-spark-sql/read
Same can be achieved via bucketBy(), but repartitionByRange gives better results
Understanding DAG:
For the above Join operation where 20 tables data in Parquet is joined to form a single set. In Yarn UI, click on SQL in the pane. Below is the screenshot.
Level 1[Scan Parquet] - Indicates that the data is read from 3 parquet files as 3 tables
Level 2[Exchange] - As the data in HDFS or S3 is distributed among multiple nodes, Shuffle happens here and it is termed as Exchange.
Level 3[ Sort Merge Join] - In this stage Table 1 is Joined with Table 2 . And Table 2 is Joined with Table 3. Finally we will have 2 Datasets after this stage.
Going further, In the similar fashion all the tables would be joined.
Data Skewness :
https://selectfrom.dev/spark-performance-tuning-skewness-part-2-9f50c765a87e
https://www.youtube.com/watch?v=HIlfO1pGo0w




