If I follow your logic, you can think about PySpark to read your transformed file from ADLS into a DF then apply your CASE logic to that DF using the withColumn
method with when
and otherwise
functions.
I assume you need the JDBC connector to write the DF to a staging table in your Synapse dedicated pool.
Execute a SQL query to perform the update from the staging table to your target table. This involves using a MERGE
statement or a combination of UPDATE
and JOIN
statements.
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
spark = SparkSession.builder.appName("SynapseUpdate").getOrCreate()
df = spark.read.format("your_file_format").load("path_to_your_file_in_ADLS")
df = df.withColumn("col_1", when(df["active_flg"] == "Y", df["exmpt_rsn_id"]).otherwise(None))
df = df.withColumn("col_2", when(df["active_flg"] == "Y", 1).otherwise(0))
jdbc_url = "jdbc:sqlserver://your_synapse_server.database.windows.net:1433;database=your_database"
properties = {"user": "your_username", "password": "your_password", "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
staging_table_name = "staging_table_for_update"
df.write.jdbc(url=jdbc_url, table=staging_table_name, mode="overwrite", properties=properties)
"""
MERGE INTO target_table AS target
USING staging_table_for_update AS source
ON target.col_3 = source.time_day_id AND target.col_4 = source.loc_id AND target.col_5 = '19'
WHEN MATCHED THEN
UPDATE SET target.col_1 = source.col_1, target.col_2 = source.col_2, target.AUDIT_DTTM = source.curr_timestamp
"""