Data load issue from databricks dataframe to cosmos db container

Ravada Rama Santosh 1 Reputation point
2022-09-21T06:31:02.633+00:00

I am trying to load data from Azure databricks dataframe to cosmos db container using below command

cfg = {

"spark.cosmos.accountEndpoint" : cosmosEndpoint,

"spark.cosmos.accountKey" : cosmosMasterKey,

"spark.cosmos.database" : cosmosDatabaseName,

"spark.cosmos.container" : cosmosContainerName,

}

Python command to load data into cosmos db using append option:
df.write.format("cosmos.oltp").options(**cfg).mode("append").save()

dataframe syntax is
id:string
countryCode:string
articles:array
element:struct
articleId:string
mainCategoryId:string
subCategoryId:string
transactionDate:date
size:string

My query is when the target cosmos container already have data matching with source for a particular id, it is replacing the articles array with latest transactions instead of appending the new articles/products purchased by customer.

Requirement is : If the ID already exists with some products already purchased, the data with latest purchased be added to the array struct of articles.

I have tried multiple opertaions like "spark.cosmos.write.strategy": "ItemAppend",

"Upsert": True

at databricks load command but nothing works. Please assist!!!

Background: I am trying to load customer transaction details for a retail by manually running append for different time intervals like from 2012 to 2017 and then 2017 to 2022. Same customer would have made transactions in both the periods so how will this append mode handle?

Azure Databricks
Azure Databricks
An Apache Spark-based analytics platform optimized for Azure.
1,904 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,435 questions
0 comments No comments
{count} votes

2 answers

Sort by: Most helpful
  1. skumarrana 321 Reputation points
    2022-09-22T04:15:12.917+00:00

    Hello,

    It seems your use case could be resolved by using ItemPatch write strategy for CosmosDB. Just take a look at this quickstart by Microsoft.
    https://learn.microsoft.com/en-us/azure/cosmos-db/sql/create-sql-api-spark?tabs=python#partial-document-update-using-patch

    Here are the operation types for ItemPatch write strategy. I believe operation type "Add" without specifying array index (specify "-" character ) should allow you add an item to the array.
    https://learn.microsoft.com/en-us/azure/cosmos-db/partial-document-update#supported-operations

    Hoping it will help!

    Thanks.

    1 person found this answer helpful.

  2. Ravada Rama Santosh 1 Reputation point
    2022-09-23T10:45:30.437+00:00

    I have already tried this solution but it didn’t work as it was giving multiple errors when keeping – or index number or using actual array element name. Actually, the source data frame syntax is as below so, here articles array has struct elements within it.
    dataframe syntax is
    id:string
    countryCode:string
    articles:array
    element:struct
    articleId:string
    mainCategoryId:string
    subCategoryId:string
    transactionDate:date
    size:string

    Business requirement is when we load data for a particular year, say 2021 and the transactions for different customers identified as id are loaded as below for example, with one customer 12345 having two products purchased in 2021.

    {
    "id": "12345",
    "countryCode": "IN",
    "articles": [
    {
    "articleId": "123",
    "mainCategoryId": "Boots",
    "subCategoryId": "ladies",
    "purchasePrice": 200,
    "orderId": 12345,
    "transactionDate": 17861,
    "size": "38"
    },
    {
    "articleId": "456",
    "mainCategoryId": "Boots",
    "subCategoryId": "men",
    "purchasePrice": 300,
    "orderId": 12367,
    "transactionDate": 17867,
    "size": "38"
    }
    ],
    "_rid": "XXXXX",
    "_self": "YYY",
    "_etag": “ZZZ",
    "_attachments": "attachments/",
    "_ts": 1660823865
    }

    When we load data for 2022 year, it should ideally append the newly purchased articles by the customer in 2022 along with already purchased articles in 2021

    Ideal behavior:
    {
    "id": "12345",
    "countryCode": "IN",
    "articles": [
    {
    "articleId": "123",
    "mainCategoryId": "Boots",
    "subCategoryId": "ladies",
    "purchasePrice": 200,
    "orderId": 12345,
    "transactionDate": 17861,
    "size": "38"
    },
    {
    "articleId": "456",
    "mainCategoryId": "Boots",
    "subCategoryId": "men",
    "purchasePrice": 300,
    "orderId": 12367,
    "transactionDate": 17867,
    "size": "38"
    },
    {
    "articleId": "123",
    "mainCategoryId": "Tshirt",
    "subCategoryId": "ladies",
    "purchasePrice": 500,
    "orderId": 12343,
    "transactionDate": 17889,
    "size": "L"
    },
    {
    "articleId": "456",
    "mainCategoryId": " Tshirt ",
    "subCategoryId": "men",
    "purchasePrice": 600,
    "orderId": 12382,
    "transactionDate": 17889,
    "size": "L"
    }
    ],
    "_rid": "XXXXX",
    "_self": "YYY",
    "_etag": “ZZZ",
    "_attachments": "attachments/",
    "_ts": 1660823865
    }

    Issue is it is replacing the existing articles with new ones

    {
    "id": "12345",
    "countryCode": "IN",
    "articles": [
    {
    "articleId": "123",
    "mainCategoryId": "Tshirt",
    "subCategoryId": "ladies",
    "purchasePrice": 500,
    "orderId": 12343,
    "transactionDate": 17889,
    "size": "L"
    },
    {
    "articleId": "456",
    "mainCategoryId": " Tshirt ",
    "subCategoryId": "men",
    "purchasePrice": 600,
    "orderId": 12382,
    "transactionDate": 17889,
    "size": "L"
    }
    ],
    "_rid": "XXXXX",
    "_self": "YYY",
    "_etag": “ZZZ",
    "_attachments": "attachments/",
    "_ts": 1660823865
    }

    0 comments No comments