Självstudie: Ansluta till Azure Cosmos DB för NoSQL med hjälp av Spark
GÄLLER FÖR: NoSQL
I den här självstudien använder du Azure Cosmos DB Spark-anslutningsappen för att läsa eller skriva data från ett Azure Cosmos DB för NoSQL-konto. I den här självstudien används Azure Databricks och en Jupyter Notebook för att illustrera hur du integrerar med API:et för NoSQL från Spark. Den här självstudien fokuserar på Python och Scala, även om du kan använda valfritt språk eller gränssnitt som stöds av Spark.
I den här självstudien lär du dig att:
- Anslut till ett API för NoSQL-konto med hjälp av Spark och en Jupyter Notebook.
- Skapa databas- och containerresurser.
- Mata in data till containern.
- Fråga efter data i containern.
- Utför vanliga åtgärder på objekt i containern.
Förutsättningar
- Ett befintligt Azure Cosmos DB för NoSQL-konto.
- Om du har en befintlig Azure-prenumeration skapar du ett nytt konto.
- Ingen Azure-prenumeration? Du kan prova Azure Cosmos DB kostnadsfritt utan kreditkort.
- En befintlig Azure Databricks-arbetsyta.
Anslut med Hjälp av Spark och Jupyter
Använd din befintliga Azure Databricks-arbetsyta för att skapa ett beräkningskluster som är redo att använda Apache Spark 3.4.x för att ansluta till ditt Azure Cosmos DB för NoSQL-konto.
Öppna din Azure Databricks-arbetsyta.
Skapa ett nytt kluster i arbetsytans gränssnitt. Konfigurera klustret med de här inställningarna, minst:
Version Värde Körningsversion 13.3 LTS (Scala 2.12, Spark 3.4.1) Använd arbetsytans gränssnitt för att söka efter Maven-paket från Maven Central med ett grupp-ID för
com.azure.cosmos.spark
. Installera paketet specifikt för Spark 3.4 med ett artefakt-ID som är prefix förazure-cosmos-spark_3-4
klustret.Skapa slutligen en ny notebook-fil.
Dricks
Som standard är notebook-filen kopplad till det nyligen skapade klustret.
I notebook-filen anger du konfigurationsinställningar för onlinetransaktionsbearbetning (OLTP) för NoSQL-kontoslutpunkten, databasnamnet och containernamnet.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Skapa en databas och en container
Använd katalog-API:et för att hantera kontoresurser som databaser och containrar. Sedan kan du använda OLTP för att hantera data i containerresurserna.
Konfigurera katalog-API:et för att hantera API för NoSQL-resurser med hjälp av Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Skapa en ny databas med namnet
cosmicworks
med hjälpCREATE DATABASE IF NOT EXISTS
av .# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Skapa en ny container med namnet
products
med hjälpCREATE TABLE IF NOT EXISTS
av . Se till att du anger sökvägen för partitionsnyckeln till/category
och aktiverar dataflöde för automatisk skalning med ett maximalt dataflöde1000
för enheter för begäranden (RU:er) per sekund.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Skapa en annan container med namnet
employees
med hjälp av en hierarkisk partitionsnyckelkonfiguration. Använd/organization
,/department
och/team
som uppsättning partitionsnyckelsökvägar. Följ den specifika ordningen. Ange också dataflödet till en manuell mängd400
RU:er.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Kör notebook-cellerna för att verifiera att databasen och containrarna har skapats i ditt API för NoSQL-kontot.
Mata in data
Skapa en exempeldatauppsättning. Använd sedan OLTP för att mata in dessa data till API:et för NoSQL-containern.
Skapa en exempeldatauppsättning.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Använd
spark.createDataFrame
och den tidigare sparade OLTP-konfigurationen för att lägga till exempeldata i målcontainern.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Fråga efter data
Läs in OLTP-data i en dataram för att utföra vanliga frågor om data. Du kan använda olika syntaxer för att filtrera eller fråga efter data.
Använd
spark.read
för att läsa in OLTP-data i ett dataramobjekt. Använd samma konfiguration som du använde tidigare i den här självstudien.spark.cosmos.read.inferSchema.enabled
Ange också så atttrue
Spark-anslutningsappen kan härleda schemat genom att sampling av befintliga objekt.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Rendera schemat för data som läses in i dataramen med hjälp
printSchema
av .# Render schema df.printSchema()
// Render schema df.printSchema()
Rendera datarader där
quantity
kolumnen är mindre än20
.where
Använd funktionerna ochshow
för att utföra den här frågan.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Rendera den första dataraden
clearance
där kolumnen ärtrue
.filter
Använd funktionen för att utföra den här frågan.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Rendera fem rader med data utan filter eller trunkering.
show
Använd funktionen för att anpassa utseendet och antalet rader som återges.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Fråga dina data med hjälp av den här råa NoSQL-frågesträngen:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Utföra vanliga åtgärder
När du arbetar med API för NoSQL-data i Spark kan du utföra partiella uppdateringar eller arbeta med data som rå JSON.
Så här utför du en partiell uppdatering av ett objekt:
Kopiera den befintliga
config
konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt konfigurerar du skrivstrategin tillItemPatch
. Inaktivera sedan massstöd. Ange kolumner och mappade åtgärder. Slutligen anger du standardåtgärdstypen tillSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Skapa variabler för objektpartitionsnyckeln och den unika identifierare som du tänker rikta in dig på som en del av den här korrigeringsåtgärden.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Skapa en uppsättning korrigeringsobjekt för att ange målobjektet och ange fält som ska ändras.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Skapa en dataram med hjälp av uppsättningen med korrigeringsobjekt. Använd
write
för att utföra korrigeringsåtgärden.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Kör en fråga för att granska resultatet av korrigeringsåtgärden. Objektet bör nu namnges
Yamba New Surfboard
utan några andra ändringar.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Så här arbetar du med råa JSON-data:
Kopiera den befintliga
config
konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt ändrar du målcontainern tillemployees
. Konfigureracontacts
sedan kolumnen/fältet för att använda råa JSON-data.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Skapa en uppsättning anställda som ska matas in i containern.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Skapa en dataram och använd
write
för att mata in de anställdas data.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Rendera data från dataramen med hjälp
show
av . Observera attcontacts
kolumnen är rå JSON i utdata.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Relaterat innehåll
- Apache Spark
- Katalog-API för Azure Cosmos DB
- Referens för konfigurationsparameter
- Exempel på Azure Cosmos DB Spark-anslutningsapp
- Migrera från Spark 2.4 till Spark 3.*
- Versionskompatibilitet:
- Utgåvan:
- Ladda ned länkar: