Read data from sql server and update via synapse spark pool

Aditya Singh 160 Reputation points
2024-03-08T13:39:29.99+00:00

In the following manger I am reading a table form sql server using synapse spark pool
User's image

How can I update the data from synapse spark pool
eg: Update a column

Azure SQL Database
Azure Data Lake Storage
Azure Data Lake Storage
An Azure service that provides an enterprise-wide hyper-scale repository for big data analytic workloads and is integrated with Azure Blob Storage.
1,559 questions
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
5,375 questions
Azure Data Factory
Azure Data Factory
An Azure service for ingesting, preparing, and transforming data at scale.
11,625 questions
{count} votes

Accepted answer
  1. Vinodh247 34,661 Reputation points MVP Volunteer Moderator
    2024-03-10T12:18:52.3533333+00:00

    Hi Aditya Singh,

    Thanks for reaching out to Microsoft Q&A.

    I think the below should work, provided with some changes to suit your scenario. Do you face any issues, if yes can you explain it?

    // Import necessary libraries
    import org.apache.spark.sql.{SparkSession, DataFrame}
    
    // Define function to read data from SQL Server
    def readFromSqlServer(spark: SparkSession, jdbcUrl: String, table: String): DataFrame = {
        spark.read.format("jdbc")
            .option("url", jdbcUrl)
            .option("dbtable", table)
            .option("user", "<your_username>")
            .option("password", "<your_password>")
            .load()
    }
    
    // Define JDBC connection properties
    val jdbcUrl = "jdbc:sqlserver://<server>:<port>;databaseName=<database_name>"
    
    // Define table to read from SQL Server
    val tableName = "<table_name>"
    // Create SparkSession
    val spark = SparkSession.builder()
        .appName("Read from SQL Server and Insert into Synapse Table")
        .getOrCreate()
    
    // Read data from SQL Server
    val dataFromSqlServer = readFromSqlServer(spark, jdbcUrl, tableName)
    
    // Perform transformations if necessary
    
    // Define SQL connection properties
    val synapseUrl = "jdbc:sqlserver://<synapse_server>.sql.azuresynapse.net:1433;database=<synapse_database>"
    val synapseUsername = "<synapse_username>"
    val synapsePassword = "<synapse_password>"
    // Write data to SQL table
    dataFromSqlServer.write
        .format("com.databricks.spark.sqldw")
        .option("url", synapseUrl)
        .option("dbtable", "<synapse_table_name>")
        .option("user", synapseUsername)
        .option("password", synapsePassword)
        .mode("append")
        .save()
    
    // Stop SparkSession
    spark.stop()
    
    

    Please 'Upvote'(Thumbs-up) and 'Accept' as an answer if the reply was helpful. This will benefit other community members who face the same issue.

    1 person found this answer helpful.

1 additional answer

Sort by: Most helpful
  1. phemanth 15,755 Reputation points Microsoft External Staff Moderator
    2024-03-13T12:24:06.74+00:00

    @Aditya Singh

    Thanks for reaching out to Microsoft Q&A.

    Sure, I can help with that. If you want to update only specific columns in a table, you can do so by first reading the data from the table, updating the necessary columns, and then writing back the data. Here’s an example

    from pyspark.sql import functions as F
    # Read data from SQL Server
    dataFromSqlServer = readFromSqlServer(spark, jdbcUrl, tableName)
    # Suppose the columns you want to update are 'column1' and 'column2'
    # Perform transformations on these columns
    dataFromSqlServer = dataFromSqlServer.withColumn('column1', F.expr("<transformation_expr_for_column1>"))
    dataFromSqlServer = dataFromSqlServer.withColumn('column2', F.expr("<transformation_expr_for_column2>"))
    # Write data back to SQL table
    dataFromSqlServer.write
        .format("com.databricks.spark.sqldw")
        .option("url", synapseUrl)
        .option("dbtable", "<synapse_table_name>")
        .option("user", synapseUsername)
        .option("password", synapsePassword)
        .mode("overwrite")
        .save()
    

    In the above code, replace <transformation_expr_for_column1> and <transformation_expr_for_column2> with the transformations you want to apply to ‘column1’ and ‘column2’ respectively.

    Please note that the mode is set to “overwrite”. This means that the existing data in the table will be replaced with the new data. If you want to append the data instead, you can change the mode to “append”.

    1 person found this answer helpful.
    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.