Performing an custom update to Synapse dedicated pool table from Databricks\PySpark

Syed1 Shabeer Ahmed 25 Reputation points

Hi Team,

I have a requirement where i need to read a transformed file and update only few columns to a Synapse dedicated pool table. Following is the update stmt:

   SET   col_1 =
               WHEN :active_flg = 'Y' THEN :exmpt_rsn_id
               ELSE null
         col_2 = 
               WHEN :active_flg = 'Y' THEN 1
               ELSE 0
         AUDIT_DTTM = <curr_timestamp>}
 WHERE       col_3 = :time_day_id
         AND col_4 = :loc_id
         AND col_5 = '19'

Here :active_flg, :exmp_rsn_id, :time_day_id_and :loc_id are coming from a the transformed file which would be available in ADLS.

I was thinking of reading the file into dataframe, then add two new columns to the dataframe with the case when logic.

But, how can we update the Synapse table from the dataframe? Please help in solutionizing this.



Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
3,788 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,694 questions
{count} votes

Accepted answer
  1. Amira Bedhiafi 8,071 Reputation points

    If I follow your logic, you can think about PySpark to read your transformed file from ADLS into a DF then apply your CASE logic to that DF using the withColumn method with when and otherwise functions.

    I assume you need the JDBC connector to write the DF to a staging table in your Synapse dedicated pool.

    Execute a SQL query to perform the update from the staging table to your target table. This involves using a MERGE statement or a combination of UPDATE and JOIN statements.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import when
    spark = SparkSession.builder.appName("SynapseUpdate").getOrCreate()
    df ="your_file_format").load("path_to_your_file_in_ADLS")
    df = df.withColumn("col_1", when(df["active_flg"] == "Y", df["exmpt_rsn_id"]).otherwise(None))
    df = df.withColumn("col_2", when(df["active_flg"] == "Y", 1).otherwise(0))
    jdbc_url = "jdbc:sqlserver://;database=your_database"
    properties = {"user": "your_username", "password": "your_password", "driver": ""}
    staging_table_name = "staging_table_for_update"
    df.write.jdbc(url=jdbc_url, table=staging_table_name, mode="overwrite", properties=properties)
    MERGE INTO target_table AS target
    USING staging_table_for_update AS source
    ON target.col_3 = source.time_day_id AND target.col_4 = source.loc_id AND target.col_5 = '19'
        UPDATE SET target.col_1 = source.col_1, target.col_2 = source.col_2, target.AUDIT_DTTM = source.curr_timestamp
    1 person found this answer helpful.

0 additional answers

Sort by: Most helpful