Upsert Data in SQL Server Using Synapse Notebook and Pyspark

Aditya Singh 160 Reputation points
2024-07-17T12:14:43.41+00:00

How do I use pyspark in a synapse notebook to upsert data in SQL Server? I am able to read the table with the following code:

df = spark.read.jdbc(url=jdbc_url, table="Dim.table_name", properties=properties)

But I am not sure how to upsert data based on key columns in SQL Server.

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,939 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
10,749 questions
{count} votes

1 answer

Sort by: Most helpful
  1. Amira Bedhiafi 25,491 Reputation points
    2024-07-17T21:23:04.9966667+00:00

    You can use the MERGE statement :

    
    from pyspark.sql.functions import when, col
    
    # Read existing data
    
    existing_df = spark.read.jdbc(url=jdbc_url, table="Dim.table_name", properties=properties)
    
    # Prepare your new/updated data
    
    # This is just an example, replace with your actual new data
    
    new_data = [("1", "John", "Doe", "30"), ("2", "Jane", "Smith", "25"), ("3", "New", "Person", "35")]
    
    columns = ["id", "first_name", "last_name", "age"]
    
    new_df = spark.createDataFrame(new_data, columns)
    
    # Perform the merge operation
    
    merge_condition = existing_df.id == new_df.id
    
    # Create a temporary view of the new data
    
    new_df.createOrReplaceTempView("updates")
    
    # Construct the merge statement
    
    merge_stmt = f"""
    
        MERGE INTO Dim.table_name AS target
    
        USING updates AS source
    
        ON target.id = source.id
    
        WHEN MATCHED THEN
    
            UPDATE SET
    
                target.first_name = source.first_name,
    
                target.last_name = source.last_name,
    
                target.age = source.age
    
        WHEN NOT MATCHED THEN
    
            INSERT (id, first_name, last_name, age)
    
            VALUES (source.id, source.first_name, source.last_name, source.age)
    
    """
    
    # Execute the merge statement
    
    spark.sql(merge_stmt)
    
    # Write the updated data back to SQL Server
    
    new_df.write \
    
        .jdbc(url=jdbc_url, table="Dim.table_name", mode="overwrite", properties=properties)
    
    
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.