Tutorial: Connect to Azure Cosmos DB for NoSQL using Spark

APPLIES TO: NoSQL

In this tutorial, you use the Azure Cosmos DB Spark connector to read or write data from an Azure Cosmos DB for NoSQL account. This tutorial uses Azure Databricks and a Jupyter notebook to illustrate how to integrate with the API for NoSQL from Spark. This tutorial focuses on Python and Scala even though you can use any language or interface supported by Spark.

In this tutorial, you learn how to:

  • Connect to an API for NoSQL account using Spark and a Jupyter notebook
  • Create database and container resources
  • Ingest data to the container
  • Query data in the container
  • Perform common operations on items in the container

Prerequisites

  • An existing Azure Cosmos DB for NoSQL account.
  • An existing Azure Databricks workspace.

Connect using Spark and Jupyter

Use your existing Azure Databricks workspace to create a compute cluster ready to use Apache Spark 3.4.x to connect to your Azure Cosmos DB for NoSQL account.

  1. Open your Azure Databricks workspace.

  2. In the workspace interface, create a new cluster. Configure the cluster with these settings, at a minimum:

    Value
    Runtime version 13.3 LTS (Scala 2.12, Spark 3.4.1)
  3. Use the workspace interface to search for Maven packages from Maven Central with a Group Id of com.azure.cosmos.spark. Install the package specific for Spark 3.4 with an Artifact Id prefixed with azure-cosmos-spark_3-4 to the cluster.

  4. Finally, create a new notebook.

    Tip

    By default, the notebook will be attached to the recently created cluster.

  5. Within the notebook, set OLTP configuration settings for NoSQL account endpoint, database name, and container name.

    # Set configuration settings
    config = {
      "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>",
      "spark.cosmos.accountKey": "<nosql-account-key>",
      "spark.cosmos.database": "cosmicworks",
      "spark.cosmos.container": "products"
    }
    
    # Set configuration settings
    val config = Map(
      "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>",
      "spark.cosmos.accountKey" -> "<nosql-account-key>",
      "spark.cosmos.database" -> "cosmicworks",
      "spark.cosmos.container" -> "products"
    )
    

Create a database and container

Use the Catalog API to manage account resources such as databases and containers. Then, you can use OLTP to manage data within the container resource[s].

  1. Configure the Catalog API to manage API for NoSQL resources using Spark.

    # Configure Catalog Api    
    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"]) 
    
    // Configure Catalog Api  
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
    spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
    
  2. Create a new database named cosmicworks using CREATE DATABASE IF NOT EXISTS.

    # Create a database using the Catalog API    
    spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
    // Create a database using the Catalog API  
    spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
    
  3. Create a new container named products using CREATE TABLE IF NOT EXISTS. Ensure that you set the partition key path to /category and enable autoscale throughput with a maximum throughput of 1000 request units per second (RU/s).

    # Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
    // Create a products container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
    
  4. Create another container named employees using a hierarchical partition key configuration with /organization, /department, and /team as the set of partition key paths in that specific order. Also, set the throughput to a manual amount of 400 RU/s

    # Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
    // Create an employees container using the Catalog API
    spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
    
  5. Run the notebook cell[s] to validate that your database and containers are created within your API for NoSQL account.

Ingest data

Create a sample dataset and then use OLTP to ingest that data to the API for NoSQL container.

  1. Create a sample data set.

    # Create sample data    
    products = (
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True)
    )
    
    // Create sample data
    val products = Seq(
      ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false),
      ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true)
    )
    
  2. Use spark.createDataFrame and the previously saved OLTP configuration to add sample data to the target container.

    # Ingest sample data    
    spark.createDataFrame(products) \
      .toDF("id", "category", "name", "quantity", "price", "clearance") \
      .write \
      .format("cosmos.oltp") \
      .options(**config) \
      .mode("APPEND") \
      .save()
    
    // Ingest sample data
    spark.createDataFrame(products)
      .toDF("id", "category", "name", "quantity", "price", "clearance")
      .write
      .format("cosmos.oltp")
      .options(config)
      .mode("APPEND")
      .save()
    

Query data

Load OLTP data into a data frame to perform common queries on the data. You can use various syntaxes filter or query data.

  1. Use spark.read to load the OLTP data into a dataframe object. Use the same configuration used earlier in this tutorial. Also, set spark.cosmos.read.inferSchema.enabled to true to allow the Spark connector to infer the schema by sampling existing items.

    # Load data    
    df = spark.read.format("cosmos.oltp") \
      .options(**config) \
      .option("spark.cosmos.read.inferSchema.enabled", "true") \
      .load()
    
    // Load data
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. Render the schema of the data loaded in the dataframe using printSchema.

    # Render schema    
    df.printSchema()
    
    // Render schema    
    df.printSchema()
    
  3. Render data rows where the quantity column is less than 20. Use the where and show functions to perform this query.

    # Render filtered data    
    df.where("quantity < 20") \
      .show()
    
    // Render filtered data
    df.where("quantity < 20")
      .show()
    
  4. Render the first data row where the clearance column is true. Use the filter function to perform this query.

    # Render 1 row of flitered data    
    df.filter(df.clearance == True) \
      .show(1)
    
    // Render 1 row of flitered data
    df.filter($"clearance" === true)
      .show(1)
    
  5. Render five rows of data with no filter or truncation. Use the show function to customize the appearance and number of rows that are rendered.

    # Render five rows of unfiltered and untruncated data    
    df.show(5, False)
    
    // Render five rows of unfiltered and untruncated data    
    df.show(5, false)
    
  6. Query your data using this raw NoSQL query string: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800

    # Render results of raw query    
    rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    rawDf = spark.sql(rawQuery)
    rawDf.show()
    
    // Render results of raw query    
    val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
    val rawDf = spark.sql(rawQuery)
    rawDf.show()
    

Perform common operations

When working with API for NoSQL data in Spark, you can perform partial updates or work with data as raw JSON.

  1. To perform a partial update of an item, perform these steps:

    1. Copy the existing config configuration variable and modify the properties in the new copy. Specifically; configure the write strategy to ItemPatch, disable bulk support, set the columns and mapped operations, and finally set the default operation type to Set.

      # Copy and modify configuration
      configPatch = dict(config)
      configPatch["spark.cosmos.write.strategy"] = "ItemPatch"
      configPatch["spark.cosmos.write.bulk.enabled"] = "false"
      configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set"
      configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
      
      // Copy and modify configuration
      val configPatch = scala.collection.mutable.Map.empty ++ config
      configPatch ++= Map(
        "spark.cosmos.write.strategy" -> "ItemPatch",
        "spark.cosmos.write.bulk.enabled" -> "false",
        "spark.cosmos.write.patch.defaultOperationType" -> "Set",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]"
      )
      
    2. Create variables for the item partition key and unique identifier that you intend to target as part of this patch operation.

      # Specify target item id and partition key
      targetItemId = "68719518391"
      targetItemPartitionKey = "gear-surf-surfboards"
      
      // Specify target item id and partition key
      val targetItemId = "68719518391"
      val targetItemPartitionKey = "gear-surf-surfboards"
      
    3. Create a set of patch objects to specify the target item and specify fields that should be modified.

      # Create set of patch diffs
      patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
      
      // Create set of patch diffs
      val patchProducts = Seq(
        (targetItemId, targetItemPartitionKey, "Yamba New Surfboard")
      )
      
    4. Create a data frame using the set of patch objects and use write to perform the patch operation.

      # Create data frame
      spark.createDataFrame(patchProducts) \
        .write \
        .format("cosmos.oltp") \
        .options(**configPatch) \
        .mode("APPEND") \
        .save()
      
      // Create data frame
      patchProducts
        .toDF("id", "category", "name")
        .write
        .format("cosmos.oltp")
        .options(configPatch)
        .mode("APPEND")
        .save()
      
    5. Run a query to review the results of the patch operation. The item should now be named Yamba New Surfboard with no other changes.

      # Create and run query
      patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
      patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
      // Create and run query
      val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
      val patchDf = spark.sql(patchQuery)
      patchDf.show(1)
      
  2. To work with raw JSON data, perform these steps:

    1. Copy the existing config configuration variable and modify the properties in the new copy. Specifically; change the target container to employees and configure the contacts column/field to use raw JSON data.

      # Copy and modify configuration
      configRawJson = dict(config)
      configRawJson["spark.cosmos.container"] = "employees"
      configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
      
      // Copy and modify configuration
      val configRawJson = scala.collection.mutable.Map.empty ++ config
      configRawJson ++= Map(
        "spark.cosmos.container" -> "employees",
        "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]"
      )
      
    2. Create a set of employees to ingest into the container.

      # Create employee data
      employees = (
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), 
      )
      
      // Create employee data
      val employees = Seq(
        ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry",  """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""")
      )
      
    3. Create a data frame and use write to ingest the employee data.

      # Ingest data
      spark.createDataFrame(employees) \
        .toDF("id", "organization", "department", "team", "name", "contacts") \
        .write \
        .format("cosmos.oltp") \
        .options(**configRawJson) \
        .mode("APPEND") \
        .save()
      
      // Ingest data
      spark.createDataFrame(employees)
        .toDF("id", "organization", "department", "team", "name", "contacts")
        .write
        .format("cosmos.oltp")
        .options(configRawJson)
        .mode("APPEND")
        .save()
      
    4. Render the data from the data frame using show. Observe that the contacts column is raw JSON in the output.

      # Read and render data
      rawJsonDf = spark.read.format("cosmos.oltp") \
        .options(**configRawJson) \
        .load()
      rawJsonDf.show()
      
      // Read and render data
      val rawJsonDf = spark.read.format("cosmos.oltp")
        .options(configRawJson)
        .load()
      rawJsonDf.show()
      

Next step