You need to generate a unique ID based on the similarity criteria in your case, National_ID
and SourceSystem
, and then writing to or appending to a Delta table while ensuring the sequence numbers are correctly maintained across incremental loads.
Start by setting up your Spark session with Delta Support ;
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Unique ID Generation and Incremental Load") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.8.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
Then create a function that reads a CSV file, checks for existing sequence numbers in the Delta table, and generates new sequence numbers for the incoming data.
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
from delta.tables import *
def read_csv_generate_sequence(csv_path, delta_table_path):
# Read CSV file
df_new = spark.read.csv(csv_path, header=True, inferSchema=True)
# Check if Delta Table exists
if DeltaTable.isDeltaTable(spark, delta_table_path):
delta_table = DeltaTable.forPath(spark, delta_table_path)
df_existing = delta_table.toDF()
# Generating a DataFrame that combines new and existing data
df_combined = df_existing.unionByName(df_new)
else:
df_combined = df_new
# Generate sequence numbers
windowSpec = Window.partitionBy("National_ID", "SourceSystem").orderBy("National_ID")
df_combined = df_combined.withColumn("SequenceNo", row_number().over(windowSpec))
# Writing/updating the Delta table
df_combined.write.format("delta").mode("overwrite").save(delta_table_path)
# Show the updated table content for verification
spark.read.format("delta").load(delta_table_path).show()
Finally call the function read_csv_generate_sequence
with the paths to your CSV files and the location of your Delta table.
csv_path_initial = "/path/to/initial/csv"
delta_table_path = "/path/to/delta/table"
read_csv_generate_sequence(csv_path_initial, delta_table_path)
csv_path_incremental = "/path/to/incremental/csv"
read_csv_generate_sequence(csv_path_incremental, delta_table_path)