Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
I den här självstudien använder du Azure Cosmos DB Spark-anslutningsappen för att läsa eller skriva data från ett Azure Cosmos DB för NoSQL-konto. I den här självstudien används Azure Databricks och en Jupyter Notebook för att illustrera hur du integrerar med API:et för NoSQL från Spark. Den här självstudien fokuserar på Python och Scala, även om du kan använda valfritt språk eller gränssnitt som stöds av Spark.
I den här tutorialen lär du dig följande:
- Anslut till ett API för NoSQL-konto med hjälp av Spark och en Jupyter Notebook.
- Skapa databas- och containerresurser.
- Mata in data till containern.
- Fråga efter data i containern.
- Utför vanliga åtgärder på objekt i containern.
Förutsättningar
- Ett befintligt Azure Cosmos DB för NoSQL-konto.
- Om du har en befintlig Azure-prenumeration skapar du ett nytt konto.
- En befintlig Azure Databricks-arbetsyta.
Anslut med Hjälp av Spark och Jupyter
Använd din befintliga Azure Databricks-arbetsyta för att skapa ett beräkningskluster som är redo att använda Apache Spark 3.4.x för att ansluta till ditt Azure Cosmos DB för NoSQL-konto.
Öppna din Azure Databricks-arbetsyta.
Skapa ett nytt kluster i arbetsytans gränssnitt. Konfigurera klustret med de här inställningarna, minst:
Utgåva Värde Körningsversion 13.3 LTS (Scala 2.12, Spark 3.4.1) Använd arbetsytans gränssnitt för att söka efter Maven-paket från Maven Central med ett grupp-ID för
com.azure.cosmos.spark. Installera paketet specifikt för Spark 3.4 med ett artefakt-ID som är prefixat medazure-cosmos-spark_3-4till klustret.Skapa en ny anteckningsbok slutligen.
Tips/Råd
Som standard är notebook-filen kopplad till det nyligen skapade klustret.
I notebook-filen anger du konfigurationsinställningar för onlinetransaktionsbearbetning (OLTP) för NoSQL-kontoslutpunkten, databasnamnet och containernamnet.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Skapa en databas och en container
Använd katalog-API:et för att hantera kontoresurser som databaser och containrar. Sedan kan du använda OLTP för att hantera data i containerresurserna.
Konfigurera katalog-API:et för att hantera API för NoSQL-resurser med hjälp av Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))Skapa en ny databas med namnet
cosmicworksmed hjälpCREATE DATABASE IF NOT EXISTSav .# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")Skapa en ny container med namnet
productsmed hjälpCREATE TABLE IF NOT EXISTSav . Säkerställ att du anger partitionsnyckelsökvägen till/categoryoch aktiverar automatisk skalning av genomströmningen med ett maximalt genomflöde på1000begärandenheter (RU:er) per sekund.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))Skapa en annan container med namnet
employeesmed hjälp av en hierarkisk partitionsnyckelkonfiguration. Använd/organization,/departmentoch/teamsom uppsättning partitionsnyckelsökvägar. Följ den specifika ordningen. Ange också genomströmningen till ett manuellt antal400ru:er.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))Kör notebook-cellerna för att verifiera att databasen och containrarna har skapats i ditt API för NoSQL-kontot.
Importera data
Skapa en exempeldatauppsättning. Använd sedan OLTP för att mata in dessa data till API:et för NoSQL-containern.
Skapa en exempeldatauppsättning.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )Använd
spark.createDataFrameoch den tidigare sparade OLTP-konfigurationen för att lägga till exempeldata i målcontainern.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Fråga efter data
Läs in OLTP-data i en dataram för att utföra vanliga frågor om data. Du kan använda olika syntaxer för att filtrera eller fråga efter data.
Använd
spark.readför att läsa in OLTP-data i ett dataramobjekt. Använd samma konfiguration som du använde tidigare i den här självstudien.spark.cosmos.read.inferSchema.enabledAnge också så atttrueSpark-anslutningsappen kan härleda schemat genom att sampling av befintliga objekt.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()Rendera schemat för data som läses in i en dataframe med hjälp av
printSchema.# Render schema df.printSchema()// Render schema df.printSchema()Rendera datarader där
quantitykolumnen är mindre än20. Använd funktionernawhereochshowför att utföra den här frågan.# Render filtered data df.where("quantity < 20") \ .show()// Render filtered data df.where("quantity < 20") .show()Rendera den första dataraden
clearancedär kolumnen ärtrue.filterAnvänd funktionen för att utföra den här frågan.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)Rendera fem rader med data utan filter eller trunkering.
showAnvänd funktionen för att anpassa utseendet och antalet rader som återges.# Render five rows of unfiltered and untruncated data df.show(5, False)// Render five rows of unfiltered and untruncated data df.show(5, false)Fråga dina data med hjälp av den här råa NoSQL-frågesträngen:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Utföra vanliga åtgärder
När du arbetar med API för NoSQL-data i Spark kan du utföra partiella uppdateringar eller arbeta med data som rå JSON.
Så här utför du en partiell uppdatering av ett objekt:
Kopiera den befintliga
configkonfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt konfigurerar du skrivstrategin tillItemPatch. Inaktivera sedan massstöd. Ställ in kolumner och kopplade operationer. Slutligen anger du standardåtgärdstypen tillSet.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )Skapa variabler för objektpartitionsnyckeln och den unika identifierare som du tänker rikta in dig på som en del av den här korrigeringsåtgärden.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"Skapa en uppsättning korrigeringsobjekt för att ange målobjektet och ange fält som ska ändras.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )Skapa en dataram med hjälp av uppsättningen med korrigeringsobjekt. Använd
writeför att utföra korrigeringsåtgärden.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()Kör en fråga för att granska resultatet av korrigeringsåtgärden. Objektet bör nu namnges
Yamba New Surfboardutan några andra ändringar.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Så här arbetar du med råa JSON-data:
Kopiera den befintliga
configkonfigurationsvariabeln och ändra egenskaperna i den nya kopian. Mer specifikt ändrar du målcontainern tillemployees. Konfigureracontactssedan kolumnen/fältet för att använda råa JSON-data.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )Skapa en uppsättning anställda som ska matas in i containern.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )Skapa en dataram och använd
writeför att mata in de anställdas data.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()Rendera data från dataramen med hjälp av
show. Observera attcontactskolumnen är rå JSON i utdata.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Relaterat innehåll
- Apache Spark
- Katalog-API för Azure Cosmos DB
- Referens för konfigurationsparameter
- Azure Cosmos DB Spark-anslutningsexempel
- Migrera från Spark 2.4 till Spark 3.*
- Inaktuella versioner:
- Azure Cosmos DB Spark Connector för Spark 3.1 och 3.2 är inaktuell, eftersom det inte längre finns några Spark 3.1- eller 3.2-körningar som stöds i Azure Databricks, Azure Synapse eller Azure HDInsight.
- Migreringsguide för uppdatering från Spark 3.1
- Migreringsguide för uppdatering från Spark 3.2
- Versionskompatibilitet:
- Utgåvan:
- Ladda ned länkar: