Tutorial: Connect to Azure Cosmos DB for NoSQL by using Spark
APPLIES TO: NoSQL
In this tutorial, you use the Azure Cosmos DB Spark connector to read or write data from an Azure Cosmos DB for NoSQL account. This tutorial uses Azure Databricks and a Jupyter notebook to illustrate how to integrate with the API for NoSQL from Spark. This tutorial focuses on Python and Scala, although you can use any language or interface supported by Spark.
In this tutorial, you learn how to:
- Connect to an API for NoSQL account by using Spark and a Jupyter notebook.
- Create database and container resources.
- Ingest data to the container.
- Query data in the container.
- Perform common operations on items in the container.
- An existing Azure Cosmos DB for NoSQL account.
- If you have an existing Azure subscription, create a new account.
- No Azure subscription? You can try Azure Cosmos DB free with no credit card required.
- An existing Azure Databricks workspace.
Use your existing Azure Databricks workspace to create a compute cluster ready to use Apache Spark 3.4.x to connect to your Azure Cosmos DB for NoSQL account.
Open your Azure Databricks workspace.
In the workspace interface, create a new cluster. Configure the cluster with these settings, at a minimum:
Version Value Runtime version 13.3 LTS (Scala 2.12, Spark 3.4.1) Use the workspace interface to search for Maven packages from Maven Central with a Group ID of
com.azure.cosmos.spark
. Install the package specifically for Spark 3.4 with an Artifact ID prefixed withazure-cosmos-spark_3-4
to the cluster.Finally, create a new notebook.
Tip
By default, the notebook is attached to the recently created cluster.
Within the notebook, set online transaction processing (OLTP) configuration settings for the NoSQL account endpoint, database name, and container name.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Use the Catalog API to manage account resources such as databases and containers. Then, you can use OLTP to manage data within the container resources.
Configure the Catalog API to manage API for NoSQL resources by using Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Create a new database named
cosmicworks
by usingCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Create a new container named
products
by usingCREATE TABLE IF NOT EXISTS
. Ensure that you set the partition key path to/category
and enable autoscale throughput with a maximum throughput of1000
request units (RUs) per second.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Create another container named
employees
by using a hierarchical partition key configuration. Use/organization
,/department
, and/team
as the set of partition key paths. Follow that specific order. Also, set the throughput to a manual amount of400
RUs.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Run the notebook cells to validate that your database and containers are created within your API for NoSQL account.
Create a sample dataset. Then use OLTP to ingest that data to the API for NoSQL container.
Create a sample dataset.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Use
spark.createDataFrame
and the previously saved OLTP configuration to add sample data to the target container.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Load OLTP data into a data frame to perform common queries on the data. You can use various syntaxes to filter or query data.
Use
spark.read
to load the OLTP data into a data-frame object. Use the same configuration you used earlier in this tutorial. Also, setspark.cosmos.read.inferSchema.enabled
totrue
to allow the Spark connector to infer the schema by sampling existing items.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Render the schema of the data loaded in the data frame by using
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Render data rows where the
quantity
column is less than20
. Use thewhere
andshow
functions to perform this query.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Render the first data row where the
clearance
column istrue
. Use thefilter
function to perform this query.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Render five rows of data with no filter or truncation. Use the
show
function to customize the appearance and number of rows that are rendered.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Query your data by using this raw NoSQL query string:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
When you work with API for NoSQL data in Spark, you can perform partial updates or work with data as raw JSON.
To perform a partial update of an item:
Copy the existing
config
configuration variable and modify the properties in the new copy. Specifically, configure the write strategy toItemPatch
. Then disable bulk support. Set the columns and mapped operations. Finally, set the default operation type toSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Create variables for the item partition key and unique identifier that you intend to target as part of this patch operation.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Create a set of patch objects to specify the target item and specify fields that should be modified.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Create a data frame by using the set of patch objects. Use
write
to perform the patch operation.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Run a query to review the results of the patch operation. The item should now be named
Yamba New Surfboard
with no other changes.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
To work with raw JSON data:
Copy the existing
config
configuration variable and modify the properties in the new copy. Specifically, change the target container toemployees
. Then configure thecontacts
column/field to use raw JSON data.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Create a set of employees to ingest into the container.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Create a data frame and use
write
to ingest the employee data.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Render the data from the data frame by using
show
. Observe that thecontacts
column is raw JSON in the output.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
- Apache Spark
- Azure Cosmos DB Catalog API
- Configuration parameter reference
- Azure Cosmos DB Spark Connector Samples
- Migrate from Spark 2.4 to Spark 3.*
- Version compatibility:
- Release notes:
- Download links: