Share via

generate duplicate unique id for similar records and incrementally load into delta table using pyspark

BathiniShirish-2885 60 Reputation points
2024-04-11T10:29:50.7233333+00:00

I have requirement to read csv files through loop . I am trying to generate same SNO for multiple files data with similar values .then write into delta file .

Please help me how to handle this use case.

Source Data comes like this

National_ID SourceSystem
123 ABC
234 ABC
234 ABC

Generate Sequence for above data like below and load into delta

National_ID SourceSystem SequenceNo
123 ABC 1
234 ABC 2
234 ABC 2

And New File Comes like this

National_ID SourceSystem
234 BBB
123 BBB

Now append this data into existing delta table and make sure sequence should be like below.

Header 1 Header 2 SequenceNO
234 BBB 2
123 BBB 1
Azure Synapse Analytics
Azure Synapse Analytics

An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.

0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Amira Bedhiafi 41,131 Reputation points Volunteer Moderator
    2024-04-11T12:27:26.8+00:00

    You need to generate a unique ID based on the similarity criteria in your case, National_ID and SourceSystem, and then writing to or appending to a Delta table while ensuring the sequence numbers are correctly maintained across incremental loads.

    Start by setting up your Spark session with Delta Support ;

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    
        .appName("Unique ID Generation and Incremental Load") \
    
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
    
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    
        .getOrCreate()
    

    Then create a function that reads a CSV file, checks for existing sequence numbers in the Delta table, and generates new sequence numbers for the incoming data.

    
    from pyspark.sql.functions import col, row_number
    
    from pyspark.sql.window import Window
    
    from delta.tables import *
    
    def read_csv_generate_sequence(csv_path, delta_table_path):
    
        # Read CSV file
    
        df_new = spark.read.csv(csv_path, header=True, inferSchema=True)
    
        
    
        # Check if Delta Table exists
    
        if DeltaTable.isDeltaTable(spark, delta_table_path):
    
            delta_table = DeltaTable.forPath(spark, delta_table_path)
    
            df_existing = delta_table.toDF()
    
            
    
            # Generating a DataFrame that combines new and existing data
    
            df_combined = df_existing.unionByName(df_new)
    
        else:
    
            df_combined = df_new
    
        
    
        # Generate sequence numbers
    
        windowSpec = Window.partitionBy("National_ID", "SourceSystem").orderBy("National_ID")
    
        df_combined = df_combined.withColumn("SequenceNo", row_number().over(windowSpec))
    
        
    
        # Writing/updating the Delta table
    
        df_combined.write.format("delta").mode("overwrite").save(delta_table_path)
    
        # Show the updated table content for verification
    
        spark.read.format("delta").load(delta_table_path).show()
    

    Finally call the function read_csv_generate_sequence with the paths to your CSV files and the location of your Delta table.

    
    csv_path_initial = "/path/to/initial/csv"
    
    delta_table_path = "/path/to/delta/table"
    
    read_csv_generate_sequence(csv_path_initial, delta_table_path)
    
    csv_path_incremental = "/path/to/incremental/csv"
    
    read_csv_generate_sequence(csv_path_incremental, delta_table_path)
    
    

Your answer

Answers can be marked as 'Accepted' by the question author and 'Recommended' by moderators, which helps users know the answer solved the author's problem.