generate duplicate unique id for similar records and incrementally load into delta table using pyspark

Bathini, Shirish 20 Reputation points
2024-04-11T10:29:50.7233333+00:00

I have requirement to read csv files through loop . I am trying to generate same SNO for multiple files data with similar values .then write into delta file .

Please help me how to handle this use case.

Source Data comes like this

National_ID SourceSystem
123 ABC
234 ABC
234 ABC

Generate Sequence for above data like below and load into delta

National_ID SourceSystem SequenceNo
123 ABC 1
234 ABC 2
234 ABC 2

And New File Comes like this

National_ID SourceSystem
234 BBB
123 BBB

Now append this data into existing delta table and make sure sequence should be like below.

Header 1 Header 2 SequenceNO
234 BBB 2
123 BBB 1
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,927 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Amira Bedhiafi 24,711 Reputation points
    2024-04-11T12:27:26.8+00:00

    You need to generate a unique ID based on the similarity criteria in your case, National_ID and SourceSystem, and then writing to or appending to a Delta table while ensuring the sequence numbers are correctly maintained across incremental loads.

    Start by setting up your Spark session with Delta Support ;

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    
        .appName("Unique ID Generation and Incremental Load") \
    
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
    
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    
        .getOrCreate()
    

    Then create a function that reads a CSV file, checks for existing sequence numbers in the Delta table, and generates new sequence numbers for the incoming data.

    
    from pyspark.sql.functions import col, row_number
    
    from pyspark.sql.window import Window
    
    from delta.tables import *
    
    def read_csv_generate_sequence(csv_path, delta_table_path):
    
        # Read CSV file
    
        df_new = spark.read.csv(csv_path, header=True, inferSchema=True)
    
        
    
        # Check if Delta Table exists
    
        if DeltaTable.isDeltaTable(spark, delta_table_path):
    
            delta_table = DeltaTable.forPath(spark, delta_table_path)
    
            df_existing = delta_table.toDF()
    
            
    
            # Generating a DataFrame that combines new and existing data
    
            df_combined = df_existing.unionByName(df_new)
    
        else:
    
            df_combined = df_new
    
        
    
        # Generate sequence numbers
    
        windowSpec = Window.partitionBy("National_ID", "SourceSystem").orderBy("National_ID")
    
        df_combined = df_combined.withColumn("SequenceNo", row_number().over(windowSpec))
    
        
    
        # Writing/updating the Delta table
    
        df_combined.write.format("delta").mode("overwrite").save(delta_table_path)
    
        # Show the updated table content for verification
    
        spark.read.format("delta").load(delta_table_path).show()
    

    Finally call the function read_csv_generate_sequence with the paths to your CSV files and the location of your Delta table.

    
    csv_path_initial = "/path/to/initial/csv"
    
    delta_table_path = "/path/to/delta/table"
    
    read_csv_generate_sequence(csv_path_initial, delta_table_path)
    
    csv_path_incremental = "/path/to/incremental/csv"
    
    read_csv_generate_sequence(csv_path_incremental, delta_table_path)
    
    

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.