Performing an custom update to Synapse dedicated pool table from Databricks\PySpark

Syed1 Shabeer Ahmed 25 Reputation points
2023-11-13T09:52:09.7133333+00:00

Hi Team,

I have a requirement where i need to read a transformed file and update only few columns to a Synapse dedicated pool table. Following is the update stmt:

UPDATE   <SCHEMA.TABLE>
   SET   col_1 =
            CASE
               WHEN :active_flg = 'Y' THEN :exmpt_rsn_id
               ELSE null
            END,
         col_2 = 
             CASE
               WHEN :active_flg = 'Y' THEN 1
               ELSE 0
            END,
         AUDIT_DTTM = <curr_timestamp>}
 WHERE       col_3 = :time_day_id
         AND col_4 = :loc_id
         AND col_5 = '19'

Here :active_flg, :exmp_rsn_id, :time_day_id_and :loc_id are coming from a the transformed file which would be available in ADLS.

I was thinking of reading the file into dataframe, then add two new columns to the dataframe with the case when logic.

But, how can we update the Synapse table from the dataframe? Please help in solutionizing this.

Thanks,

Syed

Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,057 questions
Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,265 questions
{count} votes

Accepted answer
  1. Amira Bedhiafi 27,211 Reputation points
    2023-11-13T14:36:09.3233333+00:00

    If I follow your logic, you can think about PySpark to read your transformed file from ADLS into a DF then apply your CASE logic to that DF using the withColumn method with when and otherwise functions.

    I assume you need the JDBC connector to write the DF to a staging table in your Synapse dedicated pool.

    Execute a SQL query to perform the update from the staging table to your target table. This involves using a MERGE statement or a combination of UPDATE and JOIN statements.

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import when
    
    
    spark = SparkSession.builder.appName("SynapseUpdate").getOrCreate()
    
    
    df = spark.read.format("your_file_format").load("path_to_your_file_in_ADLS")
    
    
    df = df.withColumn("col_1", when(df["active_flg"] == "Y", df["exmpt_rsn_id"]).otherwise(None))
    df = df.withColumn("col_2", when(df["active_flg"] == "Y", 1).otherwise(0))
    
    
    jdbc_url = "jdbc:sqlserver://your_synapse_server.database.windows.net:1433;database=your_database"
    properties = {"user": "your_username", "password": "your_password", "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"}
    
    
    staging_table_name = "staging_table_for_update"
    df.write.jdbc(url=jdbc_url, table=staging_table_name, mode="overwrite", properties=properties)
    
    
    """
    MERGE INTO target_table AS target
    USING staging_table_for_update AS source
    ON target.col_3 = source.time_day_id AND target.col_4 = source.loc_id AND target.col_5 = '19'
    WHEN MATCHED THEN
        UPDATE SET target.col_1 = source.col_1, target.col_2 = source.col_2, target.AUDIT_DTTM = source.curr_timestamp
    """
    
    
    1 person found this answer helpful.

0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.