Självstudie: Anslut till Azure Cosmos DB för NoSQL med Spark
GÄLLER FÖR: NoSQL
I den här självstudien använder du Azure Cosmos DB Spark-anslutningsappen för att läsa eller skriva data från ett Azure Cosmos DB för NoSQL-konto. I den här självstudien används Azure Databricks och en Jupyter Notebook för att illustrera hur du integrerar med API:et för NoSQL från Spark. Den här självstudien fokuserar på Python och Scala även om du kan använda valfritt språk eller gränssnitt som stöds av Spark.
I den här självstudien lär du dig att:
- Anslut till ett API för NoSQL-konto med spark och en Jupyter-notebook-fil
- Skapa databas- och containerresurser
- Mata in data till containern
- Fråga efter data i containern
- Utföra vanliga åtgärder på objekt i containern
Förutsättningar
- Ett befintligt Azure Cosmos DB för NoSQL-konto.
- Om du har en befintlig Azure-prenumeration skapar du ett nytt konto.
- Ingen Azure-prenumeration? Du kan prova Azure Cosmos DB kostnadsfritt utan kreditkort.
- En befintlig Azure Databricks-arbetsyta.
Anslut med Spark och Jupyter
Använd din befintliga Azure Databricks-arbetsyta för att skapa ett beräkningskluster som är redo att använda Apache Spark 3.4.x för att ansluta till ditt Azure Cosmos DB för NoSQL-konto.
Öppna din Azure Databricks-arbetsyta.
Skapa ett nytt kluster i arbetsytans gränssnitt. Konfigurera klustret med de här inställningarna, minst:
Värde Körningsversion 13.3 LTS (Scala 2.12, Spark 3.4.1) Använd arbetsytans gränssnitt för att söka efter Maven-paket från Maven Central med ett grupp-ID för
com.azure.cosmos.spark
. Installera paketet som är specifikt för Spark 3.4 med ett artefakt-ID som är prefix förazure-cosmos-spark_3-4
klustret.Skapa slutligen en ny notebook-fil.
Dricks
Som standard kopplas notebook-filen till det nyligen skapade klustret.
I notebook-filen anger du OLTP-konfigurationsinställningar för NoSQL-kontoslutpunkt, databasnamn och containernamn.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Skapa en databas och container
Använd katalog-API:et för att hantera kontoresurser som databaser och containrar. Sedan kan du använda OLTP för att hantera data i containerresursen[s].
Konfigurera katalog-API:et för att hantera API för NoSQL-resurser med Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Skapa en ny databas med namnet
cosmicworks
med .CREATE DATABASE IF NOT EXISTS
# Create a database using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Skapa en ny container med namnet
products
med .CREATE TABLE IF NOT EXISTS
Se till att du anger sökvägen för partitionsnyckeln till/category
och aktiverar dataflöde för automatisk skalning med ett maximalt dataflöde1000
för enheter för begäranden per sekund (RU/s).# Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Skapa en annan container med namnet
employees
med en hierarkisk partitionsnyckelkonfiguration med/organization
,/department
och/team
som uppsättning partitionsnyckelsökvägar i den specifika ordningen. Ange också dataflödet till en manuell mängd400
RU/s# Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Kör notebook-cellen för att verifiera att databasen och containrarna har skapats i ditt API för NoSQL-kontot.
Mata in data
Skapa en exempeldatauppsättning och använd sedan OLTP för att mata in dessa data till API:et för NoSQL-containern.
Skapa en exempeldatauppsättning.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Använd
spark.createDataFrame
och den tidigare sparade OLTP-konfigurationen för att lägga till exempeldata i målcontainern.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Fråga efter data
Läs in OLTP-data i en dataram för att utföra vanliga frågor om data. Du kan använda olika syntaxfilter eller frågedata.
Använd
spark.read
för att läsa in OLTP-data i ett dataramobjekt. Använd samma konfiguration som användes tidigare i den här självstudien.spark.cosmos.read.inferSchema.enabled
Ange också true så att Spark-anslutningsappen kan härleda schemat genom att sampling av befintliga objekt.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Rendera schemat för data som lästs in i dataramen med hjälp av
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Rendera datarader där
quantity
kolumnen är mindre än20
.where
Använd funktionerna ochshow
för att utföra den här frågan.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Rendera den första dataraden
clearance
där kolumnen är sann.filter
Använd funktionen för att utföra den här frågan.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Rendera fem rader med data utan filter eller trunkering.
show
Använd funktionen för att anpassa utseendet och antalet rader som återges.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Fråga dina data med den här råa NoSQL-frågesträngen:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Utföra vanliga åtgärder
När du arbetar med API för NoSQL-data i Spark kan du utföra partiella uppdateringar eller arbeta med data som rå JSON.
Utför följande steg för att utföra en partiell uppdatering av ett objekt:
Kopiera den befintliga
config
konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Specifikt; konfigurera skrivstrategin tillItemPatch
, inaktivera massstöd, ange kolumner och mappade åtgärder och ange slutligen standardåtgärdstypen tillSet
.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Skapa variabler för objektpartitionsnyckeln och den unika identifierare som du tänker rikta in dig på som en del av den här korrigeringsåtgärden.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Skapa en uppsättning korrigeringsobjekt för att ange målobjektet och ange fält som ska ändras.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Skapa en dataram med hjälp av uppsättningen korrigeringsobjekt och använd
write
för att utföra korrigeringsåtgärden.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Kör en fråga för att granska resultatet av korrigeringsåtgärden. Objektet bör nu namnges
Yamba New Surfboard
utan några andra ändringar.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Utför följande steg för att arbeta med råa JSON-data:
Kopiera den befintliga
config
konfigurationsvariabeln och ändra egenskaperna i den nya kopian. Specifikt; ändra målcontainern tillemployees
och konfigureracontacts
kolumnen/fältet för att använda råa JSON-data.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Skapa en uppsättning anställda som ska matas in i containern.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Skapa en dataram och använd
write
för att mata in de anställdas data.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Rendera data från dataramen med hjälp av
show
. Observera attcontacts
kolumnen är rå JSON i utdata.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Relaterat innehåll
- Apache Spark
- Katalog-API för Azure Cosmos DB
- Referens för konfigurationsparameter
- Exempel på notebook-filen "New York City Taxi data"
- Migrera från Spark 2.4 till Spark 3.*
- Versionskompatibilitet
- Utgåvan
- Ladda ned länkar