Errors when applying no-equity left join (>=) to join more than one tables to a single table in a data flow activity

Min Xuan Lim 0 Reputation points
2024-08-21T14:06:37.54+00:00

Hi, I am trying to do some no-equity left joins to join 3 tables into one in a dataflow, where every tables contains over 20 million rows of data and one of the tables contains 400 millions rows of data, and i get the errors as following when executing the data flow.User's image

User's image

The second flow is the main flow to join the other sources using no-equity left join method to the second imported source.

User's image

here is the joining method I am using.

User's image

I selected both option for the broadcast settings, even the execution is failed with same error indicated by selecting the 'Right' as the option for the settings

I am happy if there are any suggestions to resolve this error. Thank you.

Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
11,655 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Amira Bedhiafi 34,336 Reputation points Volunteer Moderator
    2024-08-21T18:10:44.43+00:00

    Based on this old thread :

    It seems like the problem is the amount of data you are trying to pull back to to your driver is too large. Most likely you are using the collect method to retrieve all values from a DataFrame/RDD. The driver is a single process and by collecting a DataFrame you are pulling all of that data you had distributed across the cluster back to one node. This defeats the purpose of distributing it! It only makes sense to do this after you have reduced the data down to a manageable amount.

    1. Increase spark.driver.maxResultSize

    • This setting controls the maximum size of serialized results that can be sent to the driver. By default, it's set to 4GB, but you can increase it if your driver has sufficient memory. For example:
      
           spark.conf.set("spark.driver.maxResultSize", "8g")
      
      
      • Caution: Setting this value too high can lead to out-of-memory errors on the driver node.

    2. Avoid Collecting Large Datasets on the Driver

    • The main issue arises when the driver tries to collect too much data from the executors. To avoid this:
      • Perform Aggregations on the Executors: Instead of collecting large datasets to the driver, perform as much filtering, aggregation, and transformation on the executors as possible.
      • Limit the Data: If you need to collect data on the driver, ensure you're only collecting a small subset (e.g., using .limit(100)) instead of the entire dataset.

    3. Optimize the Join Strategy

    • Reduce the Size of Broadcast Data: If you're using broadcast joins, check the size of the data being broadcast. You can reduce the threshold for broadcasting by setting spark.sql.autoBroadcastJoinThreshold to a lower value, which forces Spark to use a shuffle join instead:
      
           spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
      
      
      • Repartition the Data: Ensure that the data is well-partitioned before the join operation. Too many partitions can lead to excessive shuffling, while too few can cause skewed data distribution:
        
             df = df.repartition(100)  # Example: repartition to 100 partitions
        
        

    4. Persist Intermediate Data

    • Persist or Cache Intermediate DataFrames: If your data flow involves multiple joins and transformations, consider persisting intermediate DataFrames to disk or memory:
      
           df1 = df1.persist(StorageLevel.MEMORY_AND_DISK)
      
           df2 = df2.persist(StorageLevel.MEMORY_AND_DISK)
      
      
      • This can reduce the repeated shuffling of data between stages.

    5. Consider Using Parquet or Delta Formats

    • Save and Load Data Efficiently: Instead of trying to process all the data in one go, consider saving intermediate results to Parquet or Delta format files and then loading them as needed for the next stage of processing:
      
           df.write.parquet("path_to_save")
      
           df = spark.read.parquet("path_to_save")
      
      
      • This approach leverages Spark's efficient I/O operations.

    6. Monitor the Number of Partitions

    • Reduce Number of Partitions: Too many partitions can also lead to this issue. Monitor the partitioning and adjust accordingly:
      
           spark.conf.set("spark.sql.shuffle.partitions", "200")
      
      
      • Experiment with this setting to find the optimal number based on your cluster size and data volume.

    7. Consider Using a Cluster with More Resources

    • If you're consistently running into resource limits, it might be necessary to scale up your cluster by adding more worker nodes or increasing the memory and CPU resources of existing nodes.

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.