Zelfstudie: Verbinding maken met Azure Cosmos DB for NoSQL met behulp van Spark
VAN TOEPASSING OP: NoSQL
In deze zelfstudie gebruikt u de Azure Cosmos DB Spark-connector voor het lezen of schrijven van gegevens uit een Azure Cosmos DB for NoSQL-account. In deze zelfstudie wordt gebruikgemaakt van Azure Databricks en een Jupyter-notebook om te laten zien hoe u integreert met de API voor NoSQL vanuit Spark. Deze zelfstudie is gericht op Python en Scala, hoewel u elke taal of interface kunt gebruiken die wordt ondersteund door Spark.
In deze zelfstudie leert u het volgende:
- Maak verbinding met een API voor NoSQL-account met behulp van Spark en een Jupyter-notebook.
- Database- en containerbronnen maken.
- Gegevens opnemen in de container.
- Query's uitvoeren op gegevens in de container.
- Voer algemene bewerkingen uit op items in de container.
Vereisten
- Een bestaand Azure Cosmos DB for NoSQL-account.
- Als u een bestaand Azure-abonnement hebt, maakt u een nieuw account.
- Geen Azure-abonnement? U kunt Azure Cosmos DB gratis proberen zonder dat er een creditcard is vereist.
- Een bestaande Azure Databricks-werkruimte.
Verbinding maken met behulp van Spark en Jupyter
Gebruik uw bestaande Azure Databricks-werkruimte om een rekencluster te maken dat gereed is voor het gebruik van Apache Spark 3.4.x om verbinding te maken met uw Azure Cosmos DB for NoSQL-account.
Open uw Azure Databricks-werkruimte.
Maak een nieuw cluster in de werkruimte-interface. Configureer het cluster met deze instellingen minimaal:
Versie Weergegeven als Runtime-versie 13.3 LTS (Scala 2.12, Spark 3.4.1) Gebruik de werkruimte-interface om te zoeken naar Maven-pakketten van Maven Central met een groeps-id van
com.azure.cosmos.spark
. Installeer het pakket specifiek voor Spark 3.4 met een artefact-id die is voorafgegaan doorazure-cosmos-spark_3-4
het cluster.Maak ten slotte een nieuw notitieblok.
Tip
Standaard wordt het notebook gekoppeld aan het onlangs gemaakte cluster.
Stel in het notebook configuratie-instellingen voor online transaction processing (OLTP) in voor het NoSQL-accounteindpunt, de databasenaam en de containernaam.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Een database en een container maken
Gebruik de Catalogus-API om accountbronnen, zoals databases en containers, te beheren. Vervolgens kunt u OLTP gebruiken om gegevens binnen de containerbronnen te beheren.
Configureer de Catalogus-API voor het beheren van API voor NoSQL-resources met behulp van Spark.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Maak een nieuwe database met de naam
cosmicworks
met behulp vanCREATE DATABASE IF NOT EXISTS
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Maak een nieuwe container met de naam
products
met behulp vanCREATE TABLE IF NOT EXISTS
. Zorg ervoor dat u het pad/category
naar de partitiesleutel instelt en doorvoer voor automatische schaalaanpassing inschakelt met een maximale doorvoer van1000
aanvraageenheden (RU's) per seconde.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Maak een andere container met de naam
employees
met behulp van een hiƫrarchische partitiesleutelconfiguratie. Gebruik/organization
,/department
en/team
als de set partitiesleutelpaden. Volg die specifieke volgorde. Stel de doorvoer ook in op een handmatige400
hoeveelheid RU's.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Voer de notebookcellen uit om te controleren of uw database en containers zijn gemaakt in uw API voor NoSQL-account.
Gegevens opnemen
Maak een voorbeeldgegevensset. Gebruik vervolgens OLTP om die gegevens op te nemen in de API voor NoSQL-container.
Maak een voorbeeldgegevensset.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Gebruik
spark.createDataFrame
en de eerder opgeslagen OLTP-configuratie om voorbeeldgegevens toe te voegen aan de doelcontainer.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Querygegevens
LAAD OLTP-gegevens in een gegevensframe om algemene query's uit te voeren op de gegevens. U kunt verschillende syntaxis gebruiken om gegevens te filteren of op te vragen.
Gebruik
spark.read
dit om de OLTP-gegevens in een gegevensframeobject te laden. Gebruik dezelfde configuratie die u eerder in deze zelfstudie hebt gebruikt. Stel ook inspark.cosmos.read.inferSchema.enabled
dattrue
de Spark-connector het schema kan afleiden door bestaande items te bemonsteren.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Geef het schema weer van de gegevens die in het gegevensframe zijn geladen met behulp van
printSchema
.# Render schema df.printSchema()
// Render schema df.printSchema()
Gegevensrijen weergeven waarin de
quantity
kolom kleiner is dan20
. Gebruik dewhere
enshow
functies om deze query uit te voeren.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Geef de eerste gegevensrij weer waarin de
clearance
kolom zich bevindttrue
. Gebruik defilter
functie om deze query uit te voeren.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Geef vijf rijen met gegevens weer zonder filter of afkapping. Gebruik de
show
functie om het uiterlijk en het aantal rijen aan te passen dat wordt weergegeven.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Voer een query uit op uw gegevens met behulp van deze onbewerkte NoSQL-queryreeks:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Algemene bewerkingen uitvoeren
Wanneer u werkt met API voor NoSQL-gegevens in Spark, kunt u gedeeltelijke updates uitvoeren of met gegevens werken als onbewerkte JSON.
Een gedeeltelijke update van een item uitvoeren:
Kopieer de bestaande configuratievariabele
config
en wijzig de eigenschappen in de nieuwe kopie. Configureer met name de schrijfstrategie naarItemPatch
. Schakel vervolgens bulkondersteuning uit. Stel de kolommen en toegewezen bewerkingen in. Stel ten slotte het standaardbewerkingstypeSet
in op .# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Maak variabelen voor de partitiesleutel van het item en de unieke id die u wilt targeten als onderdeel van deze patchbewerking.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Maak een set patchobjecten om het doelitem op te geven en geef velden op die moeten worden gewijzigd.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Maak een gegevensframe met behulp van de set patchobjecten. Gebruik
write
dit om de patchbewerking uit te voeren.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Voer een query uit om de resultaten van de patchbewerking te bekijken. Het item moet nu worden benoemd
Yamba New Surfboard
zonder andere wijzigingen.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
Werken met onbewerkte JSON-gegevens:
Kopieer de bestaande configuratievariabele
config
en wijzig de eigenschappen in de nieuwe kopie. Wijzig met name de doelcontainer inemployees
. Configureer vervolgens de kolom/hetcontacts
veld voor het gebruik van onbewerkte JSON-gegevens.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Maak een set werknemers die u wilt opnemen in de container.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Maak een gegevensframe en gebruik
write
deze om de werknemersgegevens op te nemen.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Geef de gegevens uit het gegevensframe weer met behulp van
show
. U ziet dat decontacts
kolom onbewerkte JSON in de uitvoer is.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Gerelateerde inhoud
- Apache Spark
- Catalogus-API voor Azure Cosmos DB
- Naslaginformatie over configuratieparameters
- Voorbeelden van Azure Cosmos DB Spark-connector
- Migreren van Spark 2.4 naar Spark 3.*
- Versiecompatibiliteit:
- Opmerkingen bij de release:
- Downloadkoppelingen: