How to update a value in a delta table column with Map of Struct datatype?

Mohammad Saber 591 Reputation points
2024-01-21T06:32:58.87+00:00

I have a delta table in Databricks named prod.silver.control_table. It has a few columns including table_name with string data type and transform_options with the below structure:

 |-- transform_options: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- col_name_mappings: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- type_mappings: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)
 |    |    |-- partition_duplicates_by: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- order_duplicates_by: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)

For example, when table_name is prod.silver.weather, the transform_options is:

{
"prod.bronze.weather_source_a":{"col_name_mappings":{"col_a_old":"col_a_new","col_b_old":"col_b_new"},"type_mappings":{"col_a_new":"INT","col_b_new":"TIMESTAMP"},"partition_duplicates_by":["col_a_new"],"order_duplicates_by":["_commit_version"]},

"prod.bronze.weather_source_b":{"col_name_mappings":{"col_c_old":"col_c_new","col_d_old":"col_d_new"},"type_mappings":{"col_c_new":"INT","col_d_new":"TIMESTAMP"},"partition_duplicates_by":["col_c_new"],"order_duplicates_by":["ingestion_timestamp","_commit_version"]}
}

I need to update values in order_duplicates_by. I need to change _commit_version into commit_version by removing the initial underscore. In the above example, there are 2 key-value pairs in the transform_options column. It is not always the case and there might be only one key-value pair like below: For example, when table_name is prod.silver.country, the transform_options is:

{
"prod.bronze.contry_source":{"col_name_mappings":{"col_a_old":"col_a_new","col_b_old":"col_b_new"},"type_mappings":{"col_a_new":"INT","col_b_new":"STRING"},"partition_duplicates_by":["col_a_new"],"order_duplicates_by":["_commit_version"]}
} 

Any idea how to update table values? Note that I want to update values in the control table. I prefer to use the SQL command like below however if there is a better way, please let me know:

UPDATE prod.silver.control_table
SET ...

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
2,533 questions
0 comments No comments
{count} votes

Accepted answer
  1. ShaikMaheer-MSFT 38,546 Reputation points Microsoft Employee Moderator
    2024-02-01T06:58:57.5966667+00:00

    Hi Mohammad Saber, Thank you for sharing resolution details. This helps all community as well. Since you cannot accept your own answer, resharing answer here. Kindly consider marking it as Accepted answer.

    update prod.silver.control_table 
    set transform_options = transform_values(
      transform_options, (k, v) -> 
        case
        when array_contains(v.order_duplicates_by, '_commit_version') then
        STRUCT(v.col_name_mappings, v.type_mappings, v.partition_duplicates_by, array_append(array_remove(v.order_duplicates_by, '_commit_version'), 'commit_version') as order_duplicates_by)
        else
         STRUCT(v.col_name_mappings, v.type_mappings, v.partition_duplicates_by, v.order_duplicates_by)
        end
        )
    
    0 comments No comments

2 additional answers

Sort by: Most helpful
  1. Amira Bedhiafi 33,631 Reputation points Volunteer Moderator
    2024-01-21T15:11:33.6966667+00:00

    Since you prefer SQL for this task, here's a way to achieve it using Spark SQL. However, in my opinion do note that complex updates might be more straightforward with DataFrame APIs (using Scala or Python).

    UPDATE prod.silver.control_table
    SET transform_options = 
        MAP(
            /* I iterate through each key-value pair in the original map */
            TRANSFORM_KEYS(transform_options, key -> key),
            TRANSFORM_VALUES(transform_options, (key, value) -> 
                /* then I create a new struct with updated order_duplicates_by */
                STRUCT(
                    value.col_name_mappings AS col_name_mappings,
                    value.type_mappings AS type_mappings,
                    value.partition_duplicates_by AS partition_duplicates_by,
                    /* Here I replace '_commit_version' with 'commit_version' in the array */
                    TRANSFORM(value.order_duplicates_by, item -> 
                        CASE WHEN item = '_commit_version' THEN 'commit_version' ELSE item END
                    ) AS order_duplicates_by
                )
            )
        )
    WHERE table_name = 'prod.silver.weather'
    OR table_name = 'prod.silver.country';
    

  2. ShaikMaheer-MSFT 38,546 Reputation points Microsoft Employee Moderator
    2024-01-22T05:50:09.52+00:00

    Hi Mohammad Saber, Thank you for posting query in Microsoft Q&A Platform. To update delta table with help of dataframe, kindly consider referring below code snippet.

    from delta.tables import DeltaTable
    
    # Load the delta table as a dataframe
    delta_df = spark.read.format("delta").load("path/to/delta_table")
    
    # Create a new dataframe with updated data
    updates = ...
    
    # Merge the new dataframe with the delta table
    deltaTable = DeltaTable.forPath(spark, "path/to/delta_table")
    deltaTable.alias("dt").merge(
        source=updates.alias("updates"),
        condition="dt.Id = updates.Id and dt.Type = updates.Type"
    ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
    
    
    

    Hope this helps. Please let me know if any further queries.


    Please consider hitting Accept Answer button. Accepted answers help community as well. Thank you.


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.