Tutorial: Herstellen einer Verbindung mit Azure Cosmos DB for NoSQL mithilfe von Spark
GILT FÜR: NoSQL
In diesem Tutorial verwenden Sie den Azure Cosmos DB Spark-Connector, um Daten aus einem Azure Cosmos DB for NoSQL-Konto zu lesen oder zu schreiben. In diesem Tutorial werden Azure Databricks und ein Jupyter Notebook verwendet, um zu veranschaulichen, wie sie die API für NoSQL mithilfe von Spark integrieren können. Dieses Tutorial konzentriert sich auf Python und Scala. Sie können allerdings jede beliebige Sprache oder Schnittstelle verwenden, die von Spark unterstützt wird.
In diesem Tutorial lernen Sie Folgendes:
- Herstellen einer Verbindung mit einer API für ein NoSQL-Konto mithilfe von Spark und einem Jupyter-Notebook
- Erstellen Sie Datenbank- und Containerressourcen.
- Erfassen von Daten im Container
- Abfragen von Daten im Container
- Ausführen allgemeiner Vorgänge für Elemente im Container
Voraussetzungen
- Ein vorhandenes Azure Cosmos DB for NoSQL-Konto
- Falls Sie bereits über ein vorhandenes Azure-Abonnement verfügen, erstellen Sie ein neues Konto.
- Kein Azure-Abonnement? Sie können Azure Cosmos DB kostenlos testen, ohne dass eine Kreditkarte erforderlich ist.
- Ein vorhandener Azure Databricks-Arbeitsbereich
Herstellen einer Verbindung mithilfe von Spark und Jupyter
Verwenden Sie Ihren vorhandenen Azure Databricks-Arbeitsbereich, um einen Computecluster zu erstellen, der Apache Spark 3.4.x verwenden kann, um eine Verbindung mit Ihrem Azure Cosmos DB for NoSQL-Konto herzustellen.
Öffnen Sie Ihren Azure Databricks-Arbeitsbereich.
Erstellen Sie auf der Oberfläche des Arbeitsbereichs einen neuen Cluster. Konfigurieren Sie den Cluster mit den folgenden (minimalen) Einstellungen:
Version Wert Laufzeitversion 13.3 LTS (Scala 2.12, Spark 3.4.1) Verwenden Sie die Benutzeroberfläche des Arbeitsbereichs, um in Maven Central nach Maven-Paketen mit der Gruppen-ID
com.azure.cosmos.spark
zu suchen. Installieren Sie das für Spark 3.4 spezifische Paket (der Artefakt-ID des Clusters hat das Präfixazure-cosmos-spark_3-4
).Erstellen Sie zuletzt neues Notebook.
Tipp
Standardmäßig wird das Notebook an den zuletzt erstellten Cluster angefügt.
Legen Sie im Notebook Konfigurationseinstellungen für die Onlinetransaktionsverarbeitung (OLTP) für den NoSQL-Kontoendpunkt, den Datenbanknamen und den Containernamen fest.
# Set configuration settings config = { "spark.cosmos.accountEndpoint": "<nosql-account-endpoint>", "spark.cosmos.accountKey": "<nosql-account-key>", "spark.cosmos.database": "cosmicworks", "spark.cosmos.container": "products" }
# Set configuration settings val config = Map( "spark.cosmos.accountEndpoint" -> "<nosql-account-endpoint>", "spark.cosmos.accountKey" -> "<nosql-account-key>", "spark.cosmos.database" -> "cosmicworks", "spark.cosmos.container" -> "products" )
Erstellen einer Datenbank und eines Containers
Verwenden Sie die Katalog-API, um Kontoressourcen wie Datenbanken und Container zu verwalten. Anschließend können Sie OLTP verwenden, um Daten innerhalb der Containerressourcen zu verwalten.
Konfigurieren Sie mithilfe von Spark die Katalog-API zum Verwalten der API für NoSQL-Ressourcen.
# Configure Catalog Api spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config["spark.cosmos.accountEndpoint"]) spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config["spark.cosmos.accountKey"])
// Configure Catalog Api spark.conf.set(s"spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog") spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", config("spark.cosmos.accountEndpoint")) spark.conf.set(s"spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", config("spark.cosmos.accountKey"))
Erstellen Sie mithilfe von
CREATE DATABASE IF NOT EXISTS
eine neue Datenbank namenscosmicworks
.# Create a database by using the Catalog API spark.sql(f"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
// Create a database by using the Catalog API spark.sql(s"CREATE DATABASE IF NOT EXISTS cosmosCatalog.cosmicworks;")
Erstellen Sie mithilfe von
CREATE TABLE IF NOT EXISTS
einen neuen Container namensproducts
. Stellen Sie sicher, dass Sie den Partitionsschlüsselpfad auf/category
festlegen und automatischen Durchsatz mit einem Maximum von1000
Anforderungseinheiten pro Sekunde (RU/s) aktivieren.# Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
// Create a products container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.products USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/category', autoScaleMaxThroughput = '1000')"))
Erstellen Sie einen weiteren Container namens
employees
mithilfe einer hierarchischen Partitionsschlüsselkonfiguration. Verwenden Sie/organization
,/department
und/team
als Partitionsschlüsselpfade. Halten Sie diese Reihenfolge ein. Legen Sie außerdem den Durchsatz auf eine manuelle Menge von400
RU/s fest.# Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
// Create an employees container by using the Catalog API spark.sql(("CREATE TABLE IF NOT EXISTS cosmosCatalog.cosmicworks.employees USING cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/organization,/department,/team', manualThroughput = '400')"))
Führen Sie die Notebookzellen aus, um zu überprüfen, ob die Datenbank und die Container in Ihrer API für das NoSQL-Konto erstellt werden.
Einlesen von Daten
Erstellen Sie ein Beispieldataset. Verwenden Sie dann OLTP, um diese Daten in die API für NoSQL-Container zu erfassen.
Erstellen Sie ein Beispieldataset.
# Create sample data products = ( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, False), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, True) )
// Create sample data val products = Seq( ("68719518391", "gear-surf-surfboards", "Yamba Surfboard", 12, 850.00, false), ("68719518371", "gear-surf-surfboards", "Kiama Classic Surfboard", 25, 790.00, true) )
Verwenden Sie
spark.createDataFrame
und die zuvor gespeicherte OLTP-Konfiguration, um Beispieldaten zum Zielcontainer hinzuzufügen.# Ingest sample data spark.createDataFrame(products) \ .toDF("id", "category", "name", "quantity", "price", "clearance") \ .write \ .format("cosmos.oltp") \ .options(**config) \ .mode("APPEND") \ .save()
// Ingest sample data spark.createDataFrame(products) .toDF("id", "category", "name", "quantity", "price", "clearance") .write .format("cosmos.oltp") .options(config) .mode("APPEND") .save()
Daten abfragen
Laden Sie OLTP-Daten in einen Datenrahmen, um gängige Abfragen für die Daten auszuführen. Sie können verschiedene Syntaxen verwenden, um Daten zu filtern oder abzufragen.
Verwenden Sie
spark.read
, um die OLTP-Daten in ein Dataframeobjekt zu laden. Verwenden Sie dieselbe Konfiguration, die Sie bereits zuvor in diesem Tutorial verwendet haben. Legen Sie außerdemspark.cosmos.read.inferSchema.enabled
auftrue
fest, damit der Spark-Connector das Schema ableiten kann, indem er vorhandene Elemente sampelt.# Load data df = spark.read.format("cosmos.oltp") \ .options(**config) \ .option("spark.cosmos.read.inferSchema.enabled", "true") \ .load()
// Load data val df = spark.read.format("cosmos.oltp") .options(config) .option("spark.cosmos.read.inferSchema.enabled", "true") .load()
Rendern Sie mithilfe von
printSchema
das Schema der im Dataframe geladenen Daten.# Render schema df.printSchema()
// Render schema df.printSchema()
Rendern Sie Datenzeilen, in denen die Spalte
quantity
kleiner als20
ist. Verwenden Sie die Funktionenwhere
undshow
, um diese Abfrage auszuführen.# Render filtered data df.where("quantity < 20") \ .show()
// Render filtered data df.where("quantity < 20") .show()
Rendern Sie die erste Datenzeile, in der die Spalte
clearance
gleichtrue
ist. Verwenden Sie die Funktionfilter
, um diese Abfrage auszuführen.# Render 1 row of flitered data df.filter(df.clearance == True) \ .show(1)
// Render 1 row of flitered data df.filter($"clearance" === true) .show(1)
Rendern Sie fünf Datenzeilen ohne Filter oder Kürzung. Verwenden Sie die Funktion
show
, um die Darstellung und Anzahl der gerenderten Zeilen anzupassen.# Render five rows of unfiltered and untruncated data df.show(5, False)
// Render five rows of unfiltered and untruncated data df.show(5, false)
Fragen Sie Ihrer Daten mithilfe dieser unformatierten NoSQL-Abfragezeichenfolge ab:
SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800
.# Render results of raw query rawQuery = "SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" rawDf = spark.sql(rawQuery) rawDf.show()
// Render results of raw query val rawQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE price > 800" val rawDf = spark.sql(rawQuery) rawDf.show()
Ausführen gängiger Vorgänge
Wenn Sie in Spark mit der API für NoSQL-Daten arbeiten, können Sie Teilupdates ausführen oder mit Daten im unformatierten JSON-Format arbeiten.
So führen Sie ein Teilupdate eines Elements aus:
Kopieren Sie die vorhandene Konfigurationsvariable
config
, und ändern Sie die Eigenschaften in der neuen Kopie. Konfigurieren Sie insbesondere die Schreibstrategie aufItemPatch
. Deaktivieren Sie dann die Massenunterstützung. Legen Sie die Spalten und zugeordneten Vorgänge fest. Legen Sie schließlich den Standardvorgangstyp aufSet
fest.# Copy and modify configuration configPatch = dict(config) configPatch["spark.cosmos.write.strategy"] = "ItemPatch" configPatch["spark.cosmos.write.bulk.enabled"] = "false" configPatch["spark.cosmos.write.patch.defaultOperationType"] = "Set" configPatch["spark.cosmos.write.patch.columnConfigs"] = "[col(name).op(set)]"
// Copy and modify configuration val configPatch = scala.collection.mutable.Map.empty ++ config configPatch ++= Map( "spark.cosmos.write.strategy" -> "ItemPatch", "spark.cosmos.write.bulk.enabled" -> "false", "spark.cosmos.write.patch.defaultOperationType" -> "Set", "spark.cosmos.write.patch.columnConfigs" -> "[col(name).op(set)]" )
Erstellen Sie Variablen für den Elementpartitionsschlüssel und den eindeutigen Bezeichner, auf den Sie im Rahmen dieses Patchvorgangs als Ziel verwenden möchten.
# Specify target item id and partition key targetItemId = "68719518391" targetItemPartitionKey = "gear-surf-surfboards"
// Specify target item id and partition key val targetItemId = "68719518391" val targetItemPartitionKey = "gear-surf-surfboards"
Erstellen Sie eine Reihe von Patchobjekten, um das Zielelement anzugeben, und geben Sie Felder an, die geändert werden sollen.
# Create set of patch diffs patchProducts = [{ "id": f"{targetItemId}", "category": f"{targetItemPartitionKey}", "name": "Yamba New Surfboard" }]
// Create set of patch diffs val patchProducts = Seq( (targetItemId, targetItemPartitionKey, "Yamba New Surfboard") )
Erstellen Sie einen Dataframe mithilfe der Gruppe von Patchobjekten. Verwenden Sie
write
, um den Patchvorgang auszuführen.# Create data frame spark.createDataFrame(patchProducts) \ .write \ .format("cosmos.oltp") \ .options(**configPatch) \ .mode("APPEND") \ .save()
// Create data frame patchProducts .toDF("id", "category", "name") .write .format("cosmos.oltp") .options(configPatch) .mode("APPEND") .save()
Führen Sie eine Abfrage aus, um die Ergebnisse des Patchvorgangs zu überprüfen. Das Element sollte jetzt
Yamba New Surfboard
benannt werden, ohne dass weitere Änderungen auftreten.# Create and run query patchQuery = f"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '{targetItemId}' AND category = '{targetItemPartitionKey}'" patchDf = spark.sql(patchQuery) patchDf.show(1)
// Create and run query val patchQuery = s"SELECT * FROM cosmosCatalog.cosmicworks.products WHERE id = '$targetItemId' AND category = '$targetItemPartitionKey'" val patchDf = spark.sql(patchQuery) patchDf.show(1)
So arbeiten Sie mit unformatierten JSON-Daten:
Kopieren Sie die vorhandene Konfigurationsvariable
config
, und ändern Sie die Eigenschaften in der neuen Kopie. Ändern Sie insbesondere den Zielcontainer inemployees
. Konfigurieren Sie dann die Spalte/das Feldcontacts
, um unformatierte JSON-Daten zu verwenden.# Copy and modify configuration configRawJson = dict(config) configRawJson["spark.cosmos.container"] = "employees" configRawJson["spark.cosmos.write.patch.columnConfigs"] = "[col(contacts).path(/contacts).op(set).rawJson]"
// Copy and modify configuration val configRawJson = scala.collection.mutable.Map.empty ++ config configRawJson ++= Map( "spark.cosmos.container" -> "employees", "spark.cosmos.write.patch.columnConfigs" -> "[col(contacts).path(/contacts).op(set).rawJson]" )
Erstellen Sie eine Gruppe von Mitarbeitern, die im Container erfasst werden sollen.
# Create employee data employees = ( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", '[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]'), )
// Create employee data val employees = Seq( ("63476388581", "CosmicWorks", "Marketing", "Outside Sales", "Alain Henry", """[ { "type": "phone", "value": "425-555-0117" }, { "email": "alain@adventure-works.com" } ]""") )
Erstellen Sie einen Datenrahmen, und verwenden Sie
write
, um die Mitarbeiterdaten zu erfassen.# Ingest data spark.createDataFrame(employees) \ .toDF("id", "organization", "department", "team", "name", "contacts") \ .write \ .format("cosmos.oltp") \ .options(**configRawJson) \ .mode("APPEND") \ .save()
// Ingest data spark.createDataFrame(employees) .toDF("id", "organization", "department", "team", "name", "contacts") .write .format("cosmos.oltp") .options(configRawJson) .mode("APPEND") .save()
Rendern Sie die Daten aus dem Datenframe mithilfe von
show
. Sie erkennen, dass die Spaltecontacts
in der Ausgabe aus JSON-Rohdaten besteht.# Read and render data rawJsonDf = spark.read.format("cosmos.oltp") \ .options(**configRawJson) \ .load() rawJsonDf.show()
// Read and render data val rawJsonDf = spark.read.format("cosmos.oltp") .options(configRawJson) .load() rawJsonDf.show()
Zugehöriger Inhalt
- Apache Spark
- Azure Cosmos DB-Katalog-API
- Referenz zu Konfigurationsparametern
- Beispiele für den Azure Cosmos DB-Spark-Connector
- Migration von Spark 2.4 zu Spark 3.*
- Versionskompatibilität:
- Versionshinweise:
- Downloadlinks: