Tutorial: Connect to Azure Cosmos DB for NoSQL by using Spark
Article
03/18/2025
APPLIES TO:
NoSQL
In this tutorial, you use the Azure Cosmos DB Spark connector to read or write data from an Azure Cosmos DB for NoSQL account. This tutorial uses Azure Databricks and a Jupyter notebook to illustrate how to integrate with the API for NoSQL from Spark. This tutorial focuses on Python and Scala, although you can use any language or interface supported by Spark.
In this tutorial, you learn how to:
Connect to an API for NoSQL account by using Spark and a Jupyter notebook.
Create database and container resources.
Ingest data to the container.
Query data in the container.
Perform common operations on items in the container.
Use your existing Azure Databricks workspace to create a compute cluster ready to use Apache Spark 3.4.x to connect to your Azure Cosmos DB for NoSQL account.
Open your Azure Databricks workspace.
In the workspace interface, create a new cluster. Configure the cluster with these settings, at a minimum:
Version
Value
Runtime version
13.3 LTS (Scala 2.12, Spark 3.4.1)
Use the workspace interface to search for Maven packages from Maven Central with a Group ID of com.azure.cosmos.spark. Install the package specifically for Spark 3.4 with an Artifact ID prefixed with azure-cosmos-spark_3-4 to the cluster.
Finally, create a new notebook.
Tip
By default, the notebook is attached to the recently created cluster.
Within the notebook, set online transaction processing (OLTP) configuration settings for the NoSQL account endpoint, database name, and container name.
Use the Catalog API to manage account resources such as databases and containers. Then, you can use OLTP to manage data within the container resources.
Configure the Catalog API to manage API for NoSQL resources by using Spark.
# Configure Catalog Api
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"])
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api
spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint"))
spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Create a new database named cosmicworks by using CREATE DATABASE IF NOT EXISTS.
# Create a database by using the Catalog API
spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API
spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Create a new container named products by using CREATE TABLE IF NOT EXISTS. Ensure that you set the partition key path to /category and enable autoscale throughput with a maximum throughput of 1000 request units (RUs) per second.
# Create a products container by using the Catalog API
spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API
spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Create another container named employees by using a hierarchical partition key configuration. Use /organization, /department, and /team as the set of partition key paths. Follow that specific order. Also, set the throughput to a manual amount of 400 RUs.
# Create an employees container by using the Catalog API
spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API
spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Run the notebook cells to validate that your database and containers are created within your API for NoSQL account.
Ingest data
Create a sample dataset. Then use OLTP to ingest that data to the API for NoSQL container.
Load OLTP data into a data frame to perform common queries on the data. You can use various syntaxes to filter or query data.
Use spark.read to load the OLTP data into a data-frame object. Use the same configuration you used earlier in this tutorial. Also, set spark.cosmos.read.inferSchema.enabled to true to allow the Spark connector to infer the schema by sampling existing items.
// Load data
val df = spark.read.format("cosmos.oltp")
.options(config)
.option("spark.cosmos.read.inferSchema.enabled", "true")
.load()
Render the schema of the data loaded in the data frame by using printSchema.
# Render schema
df.printSchema()
// Render schema
df.printSchema()
Render data rows where the quantity column is less than 20. Use the where and show functions to perform this query.
# Render filtered data
df.where("quantity < 20") \
.show()
// Render filtered data
df.where("quantity < 20")
.show()
Render the first data row where the clearance column is true. Use the filter function to perform this query.
# Render 1 row of flitered data
df.filter(df.clearance == True) \
.show(1)
// Render 1 row of flitered data
df.filter($"clearance" === true)
.show(1)
Render five rows of data with no filter or truncation. Use the show function to customize the appearance and number of rows that are rendered.
# Render five rows of unfiltered and untruncated data
df.show(5, False)
// Render five rows of unfiltered and untruncated data
df.show(5, false)
Query your data by using this raw NoSQL query string: SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query
rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
rawDf = spark.sql(rawQuery)
rawDf.show()
// Render results of raw query
val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800"
val rawDf = spark.sql(rawQuery)
rawDf.show()
Perform common operations
When you work with API for NoSQL data in Spark, you can perform partial updates or work with data as raw JSON.
To perform a partial update of an item:
Copy the existing config configuration variable and modify the properties in the new copy. Specifically, configure the write strategy to ItemPatch. Then disable bulk support. Set the columns and mapped operations. Finally, set the default operation type to Set.
Run a query to review the results of the patch operation. The item should now be named Yamba New Surfboard with no other changes.
# Create and run query
patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'"
patchDf = spark.sql(patchQuery)
patchDf.show(1)
// Create and run query
val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'"
val patchDf = spark.sql(patchQuery)
patchDf.show(1)
To work with raw JSON data:
Copy the existing config configuration variable and modify the properties in the new copy. Specifically, change the target container to employees. Then configure the contacts column/field to use raw JSON data.
The Azure Cosmos DB Spark Connector for Spark 3.1 and 3.2 is deprecated, because there are no supported Sprark 3.1 or 3.2 runtimes in Azure Databricks, Azure Synapse or Azure HDInsight available anymore.