Abfragen von Cosmos DB-Daten mit Spark

Abgeschlossen

Nachdem Sie Ihrer Azure Cosmos DB-Datenbank mit aktiviertem Analysespeicher einen verknüpften Dienst hinzugefügt haben, können Sie damit die Daten mithilfe eines Spark-Pools in Ihrem Azure Synapse Analytics-Arbeitsbereich abfragen.

Laden analytischer Azure Cosmos DB-Daten in einen DataFrame

Für eine erste Erkundung oder schnelle Analyse von aus einem mit Azure Cosmos DB verknüpften Dienst stammenden Daten ist es oft am einfachsten, Daten aus einem Container in einen DataFrame zu laden. Dazu verwenden Sie eine von Spark unterstützte Sprache wie PySpark (eine Spark-spezifische Implementierung von Python) oder Scala (eine Java-basierte Sprache, die häufig in Spark zum Einsatz kommt).

Mit dem folgenden PySpark-Code können Sie z. B. einen DataFrame mit dem Namen df aus den Daten im Container my-container laden, der über den verknüpften Dienst my_linked_service verbunden ist, und die ersten 10 Datenzeilen anzeigen:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Angenommen, der Container my-container dient zum Speichern von Objekten, ähnlich wie im folgenden Beispiel:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

Die Ausgabe des PySpark-Codes sieht ähnlich aus wie in der folgenden Tabelle:

_rid _ts productID ProductName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Die Daten werden aus dem Analysespeicher und nicht aus dem Betriebsspeicher in den Container geladen. So wird sichergestellt, dass der Betriebsspeicher nicht durch Abfragen überlastet wird. Zu den Feldern im Analysespeicher gehören die von der Anwendung definierten Felder (in diesem Fall productID und productName) und automatisch erstellte Metadatenfelder.

Nach Laden des DataFrames können Sie mit dessen nativen Methoden die Daten erkunden. Der folgende Code erstellt beispielsweise einen neuen DataFrame, der nur die Spalten productID und productName nach productName geordnet enthält:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

Die Ausgabe dieses Codes sieht ähnlich aus wie diese Tabelle:

productID ProductName
125 Thingumy
123 Widget
124 Wotsit
... ...

Schreiben eines DataFrames in einen Cosmos DB-Container

In den meisten HTAP-Szenarien sollten Sie mithilfe des verknüpften Diensts Daten aus dem Analysespeicher in Spark einlesen. Sie können jedoch den Inhalt eines DataFrames in den Container schreiben, wie im folgenden Beispiel gezeigt:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Hinweis

Durch das Schreiben eines DataFrames in einen Container wird der Betriebsspeicher aktualisiert, was sich auf dessen Leistung auswirken kann. Die Änderungen werden dann mit dem Analysespeicher synchronisiert.

Abfragen analytischer Azure Cosmos DB-Daten mit Spark SQL

Spark SQL ist eine Spark-API, die die SQL-Sprachsyntax und Semantik relationaler Datenbanken in einem Spark-Pool bereitstellt. Sie können mit Spark SQL Metadaten für Tabellen definieren, die mit SQL abgefragt werden können.

Der folgende Code erstellt beispielsweise eine Tabelle mit dem Namen Products auf Grundlage des hypothetischen Containers aus den vorherigen Beispielen:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Tipp

Das Schlüsselwort %%sql am Anfang des Codes ist ein Magic-Befehl, der den Spark-Pool anweist, den Code als SQL und nicht in der Standardsprache (die normalerweise auf PySpark festgelegt ist) auszuführen.

Bei diesem Ansatz können Sie eine logische Datenbank in Ihrem Spark-Pool erstellen, mit der Sie dann die analytischen Daten in Azure Cosmos DB abfragen können. Auf diese Weise können Sie Workloads zur Datenanalyse und Berichterstellung unterstützen, ohne den Betriebsspeicher in Ihrem Azure Cosmos DB-Konto zu beeinträchtigen.