Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,180 questions
This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
How do I use pyspark in a synapse notebook to upsert data in SQL Server? I am able to read the table with the following code:
df = spark.read.jdbc(url=jdbc_url, table="Dim.table_name", properties=properties)
But I am not sure how to upsert data based on key columns in SQL Server.
You can use the MERGE statement :
from pyspark.sql.functions import when, col
# Read existing data
existing_df = spark.read.jdbc(url=jdbc_url, table="Dim.table_name", properties=properties)
# Prepare your new/updated data
# This is just an example, replace with your actual new data
new_data = [("1", "John", "Doe", "30"), ("2", "Jane", "Smith", "25"), ("3", "New", "Person", "35")]
columns = ["id", "first_name", "last_name", "age"]
new_df = spark.createDataFrame(new_data, columns)
# Perform the merge operation
merge_condition = existing_df.id == new_df.id
# Create a temporary view of the new data
new_df.createOrReplaceTempView("updates")
# Construct the merge statement
merge_stmt = f"""
MERGE INTO Dim.table_name AS target
USING updates AS source
ON target.id = source.id
WHEN MATCHED THEN
UPDATE SET
target.first_name = source.first_name,
target.last_name = source.last_name,
target.age = source.age
WHEN NOT MATCHED THEN
INSERT (id, first_name, last_name, age)
VALUES (source.id, source.first_name, source.last_name, source.age)
"""
# Execute the merge statement
spark.sql(merge_stmt)
# Write the updated data back to SQL Server
new_df.write \
.jdbc(url=jdbc_url, table="Dim.table_name", mode="overwrite", properties=properties)