Quickstart: Manage data with Azure Cosmos DB Spark 3 OLTP Connector for API for NoSQL

APPLIES TO: NoSQL

This tutorial is a quick start guide to show how to use Azure Cosmos DB Spark Connector to read from or write to Azure Cosmos DB. Azure Cosmos DB Spark Connector supports Spark 3.1.x and 3.2.x and 3.3.x.

Throughout this quick tutorial, we rely on Azure Databricks Runtime 12.2 with Spark 3.3.2 and a Jupyter Notebook to show how to use the Azure Cosmos DB Spark Connector.

You should be able to use any language supported by Spark (PySpark, Scala, Java, etc.), or any Spark interface you are familiar with (Jupyter Notebook, Livy, etc.).

Prerequisites

  • An Azure account with an active subscription.

  • Azure Databricks runtime 12.2 with Spark 3.3.2

  • (Optional) SLF4J binding is used to associate a specific logging framework with SLF4J.

SLF4J is only needed if you plan to use logging, also download an SLF4J binding, which will link the SLF4J API with the logging implementation of your choice. See the SLF4J user manual for more information.

Install Azure Cosmos DB Spark Connector in your spark cluster using the latest version for Spark 3.3.x.

The getting started guide is based on PySpark/Scala and you can run the following code snippet in an Azure Databricks PySpark/Scala notebook.

Create databases and containers

First, set Azure Cosmos DB account credentials, and the Azure Cosmos DB Database name and container name.

cosmosEndpoint = "https://REPLACEME.documents.azure.com:443/"
cosmosMasterKey = "REPLACEME"
cosmosDatabaseName = "sampleDB"
cosmosContainerName = "sampleContainer"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}

Next, you can use the new Catalog API to create an Azure Cosmos DB Database and Container through Spark.

# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create an Azure Cosmos DB database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create an Azure Cosmos DB container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

When creating containers with the Catalog API, you can set the throughput and partition key path for the container to be created.

For more information, see the full Catalog API documentation.

Ingest data

The name of the data source is cosmos.oltp, and the following example shows how you can write a memory dataframe consisting of two items to Azure Cosmos DB:

spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
  .toDF("id","name","age","isAlive") \
   .write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save()

Note that id is a mandatory field for Azure Cosmos DB.

For more information related to ingesting data, see the full write configuration documentation.

Query data

Using the same cosmos.oltp data source, we can query data and use filter to push down filters:

from pyspark.sql.functions import col

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()

df.filter(col("isAlive") == True)\
 .show()

For more information related to querying data, see the full query configuration documentation.

Partial document update using Patch

Using the same cosmos.oltp data source, we can do partial update in Azure Cosmos DB using Patch API:

cfgPatch = {"spark.cosmos.accountEndpoint": cosmosEndpoint,
          "spark.cosmos.accountKey": cosmosMasterKey,
          "spark.cosmos.database": cosmosDatabaseName,
          "spark.cosmos.container": cosmosContainerName,
          "spark.cosmos.write.strategy": "ItemPatch",
          "spark.cosmos.write.bulk.enabled": "false",
          "spark.cosmos.write.patch.defaultOperationType": "Set",
          "spark.cosmos.write.patch.columnConfigs": "[col(name).op(set)]"
          }

id = "<document-id>"
query = "select * from cosmosCatalog.{}.{} where id = '{}';".format(
    cosmosDatabaseName, cosmosContainerName, id)

dfBeforePatch = spark.sql(query)
print("document before patch operation")
dfBeforePatch.show()

data = [{"id": id, "name": "Joel Brakus"}]
patchDf = spark.createDataFrame(data)

patchDf.write.format("cosmos.oltp").mode("Append").options(**cfgPatch).save()

dfAfterPatch = spark.sql(query)
print("document after patch operation")
dfAfterPatch.show()

For more samples related to partial document update, see the GitHub code sample Patch Sample.

Schema inference

When querying data, the Spark Connector can infer the schema based on sampling existing items by setting spark.cosmos.read.inferSchema.enabled to true.

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .option("spark.cosmos.read.inferSchema.enabled", "true")\
 .load()
 
df.printSchema()


# Alternatively, you can pass the custom schema you want to be used to read the data:

customSchema = StructType([
      StructField("id", StringType()),
      StructField("name", StringType()),
      StructField("type", StringType()),
      StructField("age", IntegerType()),
      StructField("isAlive", BooleanType())
    ])

df = spark.read.schema(customSchema).format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

# If no custom schema is specified and schema inference is disabled, then the resulting data will be returning the raw Json content of the items:

df = spark.read.format("cosmos.oltp").options(**cfg)\
 .load()
 
df.printSchema()

For more information related to schema inference, see the full schema inference configuration documentation.

Raw JSON support for Spark Connector

When working with Cosmos DB, you may come across documents that contain an array of entries with potentially different structures. These documents typically have an array called "tags" that contains items with varying structures, along with a "tag_id" field that serves as an entity type identifier. To handle patching operations efficiently in Spark, you can use a custom function that handles the patching of such documents.

Sample document that can be used

{
    "id": "Test01",
    "document_type": "tag",
    "tags": [
        {
            "tag_id": "key_val",
            "params": "param1=val1;param2=val2"
        },
        {
            "tag_id": "arrays",
            "tags": "tag1,tag2,tag3"
        }
    ]
}

def init_sequences_db_config():
    #Configure Config for Cosmos DB Patch and Query
    global cfgSequencePatch
    cfgSequencePatch = {"spark.cosmos.accountEndpoint": cosmosEndpoint,
          "spark.cosmos.accountKey": cosmosMasterKey,
          "spark.cosmos.database": cosmosDatabaseName,
          "spark.cosmos.container": cosmosContainerNameTarget,
          "spark.cosmos.write.strategy": "ItemPatch", # Partial update all documents based on the patch config
          "spark.cosmos.write.bulk.enabled": "true",
          "spark.cosmos.write.patch.defaultOperationType": "Replace",
          "spark.cosmos.read.inferSchema.enabled": "false"
          }
    
def adjust_tag_array(rawBody):
    print("test adjust_tag_array")
    array_items = json.loads(rawBody)["tags"]
    print(json.dumps(array_items))
    
    output_json = [{}]

    for item in array_items:
        output_json_item = {}
        # Handle different tag types
        if item["tag_id"] == "key_val":
            output_json_item.update({"tag_id" : item["tag_id"]})
            params = item["params"].split(";")
            for p in params:
                key_val = p.split("=")
                element = {key_val[0]: key_val[1]}
                output_json_item.update(element)

        if item["tag_id"] == "arrays":
            tags_array = item["tags"].split(",")
            output_json_item.update({"tags": tags_array})
                        
        output_json.append(output_json_item)

    # convert to raw json
    return json.dumps(output_json)


init_sequences_db_config()

native_query = "SELECT c.id, c.tags, c._ts from c where EXISTS(SELECT VALUE t FROM t IN c.tags WHERE IS_DEFINED(t.tag_id))".format()

# the custom query will be processed against the Cosmos endpoint
cfgSequencePatch["spark.cosmos.read.customQuery"] = native_query
# Cosmos DB patch column configs
cfgSequencePatch["spark.cosmos.write.patch.columnConfigs"] = "[col(tags_new).path(/tags).op(set).rawJson]"

# load df
df_relevant_sequences = spark.read.format("cosmos.oltp").options(**cfgSequencePatch).load()
print(df_relevant_sequences)
df_relevant_sequences.show(20, False)
if not df_relevant_sequences.isEmpty():
    print("Found sequences to patch")
    
    # prepare udf function
    tags_udf= udf(lambda a: adjust_tag_array(a), StringType())

    df_relevant_sequences.show(20, False)

    # apply udf function for patching raw json
    df_relevant_sequences_adjusted = df_relevant_sequences.withColumn("tags_new", tags_udf("_rawBody"))
    df_relevant_sequences_adjusted.show(20, False)

    # write df
    output_df = df_relevant_sequences_adjusted.select("id","tags_new")
    output_df.write.format("cosmos.oltp").mode("Append").options(**cfgSequencePatch).save()

Configuration reference

The Azure Cosmos DB Spark 3 OLTP Connector for API for NoSQL has a complete configuration reference that provides more advanced settings for writing and querying data, serialization, streaming using change feed, partitioning and throughput management and more. For a complete listing with details, see our Spark Connector Configuration Reference on GitHub.

Microsoft Entra authentication

  1. Following the instructions on how to register an application with Microsoft Entra ID and create a service principal.

  2. You should still be in Azure portal > Microsoft Entra ID > App Registrations. In the Certificates & secrets section, create a new secret. Save the value for later.

  3. Click on the overview tab and find the values for clientId and tenantId, along with clientSecret that you created earlier, and cosmosEndpoint, subscriptionId, and resourceGroupNamefrom your account. Create a notebook as below and replace the configurations with the appropriate values:

    cosmosDatabaseName = "AADsampleDB"
    cosmosContainerName = "sampleContainer"
    authType = "ServicePrinciple"
    cosmosEndpoint = "<replace with URI of your Cosmos DB account>"
    subscriptionId = "<replace with subscriptionId>"
    tenantId = "<replace with Directory (tenant) ID from the portal>"
    resourceGroupName = "<replace with the resourceGroup name>"
    clientId = "<replace with Application (client) ID from the portal>"
    clientSecret = "<replace with application secret value you created earlier>"
    
    cfg = {
        "spark.cosmos.accountEndpoint" : cosmosEndpoint,
        "spark.cosmos.auth.type" : authType,
        "spark.cosmos.account.subscriptionId" : subscriptionId,
        "spark.cosmos.account.tenantId" : tenantId,
        "spark.cosmos.account.resourceGroupName" : resourceGroupName,
        "spark.cosmos.auth.aad.clientId" : clientId,
        "spark.cosmos.auth.aad.clientSecret" : clientSecret,
        "spark.cosmos.database" : cosmosDatabaseName,
        "spark.cosmos.container" : cosmosContainerName
    }
    
    # Configure Catalog Api to be used
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", authType)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.subscriptionId", subscriptionId)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.tenantId", tenantId)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.account.resourceGroupName", resourceGroupName)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientId", clientId)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.aad.clientSecret", clientSecret)
    
    # create an Azure Cosmos DB database using catalog api
    spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
    
    # create an Azure Cosmos DB container using catalog api
    spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))
    
    spark.createDataFrame((("cat-alive", "Schrodinger cat", 2, True), ("cat-dead", "Schrodinger cat", 2, False)))\
      .toDF("id","name","age","isAlive") \
       .write\
       .format("cosmos.oltp")\
       .options(**cfg)\
       .mode("APPEND")\
       .save()
    
    

    Tip

    In this quickstart example credentials are assigned to variables in clear-text, but for security we recommend the usage of secrets. Review instructions on how to secure credentials in Azure Synapse Apache Spark with linked services using the TokenLibrary. Or if using Databricks, review how to create an Azure Key Vault backed secret scope or a Databricks backed secret scope. For configuring secrets, review how to add secrets to your Spark configuration.

  4. Create a role using the az role definition create command. Pass in the Cosmos DB account name and resource group, followed by a body of JSON that defines the custom role. The following example creates a role named SparkConnectorAAD with permissions to read and write items in Cosmos DB containers. The role is also scoped to the account level using /.

    resourceGroupName='<myResourceGroup>'
    accountName='<myCosmosAccount>'
    az cosmosdb sql role definition create \
       --account-name $accountName \
       --resource-group $resourceGroupName \
       --body '{
           "RoleName": "SparkConnectorAAD",
           "Type": "CustomRole",
           "AssignableScopes": ["/"],
           "Permissions": [{
               "DataActions": [
                   "Microsoft.DocumentDB/databaseAccounts/readMetadata",
                   "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
                   "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
               ]
           }]
       }'
    
  5. Now list the role definition you created to fetch its ID:

        az cosmosdb sql role definition list --account-name $accountName --resource-group $resourceGroupName 
    
  6. This should bring back a response like the below. Record the id value.

        [
          {
            "assignableScopes": [
              "/subscriptions/<mySubscriptionId>/resourceGroups/<myResourceGroup>/providers/Microsoft.DocumentDB/databaseAccounts/<myCosmosAccount>"
            ],
            "id": "/subscriptions/<mySubscriptionId>/resourceGroups/<myResourceGroup>/providers/Microsoft.DocumentDB/databaseAccounts/<myCosmosAccount>/sqlRoleDefinitions/<roleDefinitionId>",
            "name": "<roleDefinitionId>",
            "permissions": [
              {
                "dataActions": [
                  "Microsoft.DocumentDB/databaseAccounts/readMetadata",
                  "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/*",
                  "Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/*"
                ],
                "notDataActions": []
              }
            ],
            "resourceGroup": "<myResourceGroup>",
            "roleName": "MyReadWriteRole",
            "sqlRoleDefinitionGetResultsType": "CustomRole",
            "type": "Microsoft.DocumentDB/databaseAccounts/sqlRoleDefinitions"
          }
        ]
    
  7. Now go to Azure portal > Microsoft Entra ID > Enterprise Applications and search for the application you created earlier. Record the Object ID found here.

    Note

    Make sure to use its Object ID as found in the Enterprise applications section of the Microsoft Entra admin center blade (and not the App registrations section you used earlier).

  8. Now create a role assignment. Replace the <aadPrincipalId> with Object ID you recorded above (note this is NOT the same as Object ID visible from the app registrations view you saw earlier). Also replace <myResourceGroup> and <myCosmosAccount> accordingly in the below. Replace <roleDefinitionId> with the id value fetched from running the az cosmosdb sql role definition list command you ran above. Then run in Azure CLI:

    resourceGroupName='<myResourceGroup>'
    accountName='<myCosmosAccount>'
    readOnlyRoleDefinitionId='<roleDefinitionId>' # as fetched above
    # For Service Principals make sure to use the Object ID as found in the Enterprise applications section of the Azure Active Directory portal blade.
    principalId='<aadPrincipalId>'
    az cosmosdb sql role assignment create --account-name $accountName --resource-group $resourceGroupName --scope "/" --principal-id $principalId --role-definition-id $readOnlyRoleDefinitionId
    
  9. Now that you have created a Microsoft Entra application and service principal, created a custom role, and assigned that role permissions to your Cosmos DB account, you should be able to run your notebook.

Migrate to Spark 3 Connector

If you are using our older Spark 2.4 Connector, you can find out how to migrate to the Spark 3 Connector here.

Next steps