Fråga Cosmos DB-data med Spark

Slutförd

När du har lagt till en länkad tjänst för din analysarkivaktiverade Azure Cosmos DB-databas kan du använda den för att köra frågor mot data med hjälp av en Spark-pool på din Azure Synapse Analytics-arbetsyta.

Läsa in Azure Cosmos DB-analysdata i en dataram

För inledande utforskning eller snabb analys av data från en länkad Azure Cosmos DB-tjänst är det ofta enklast att läsa in data från en container till en dataram med ett Spark-språk som stöds som PySpark (en Spark-specifik implementering av Python) eller Scala (ett Java-baserat språk som ofta används på Spark).

Följande PySpark-kod kan till exempel användas för att läsa in en dataram med namnet df från data i containern my-container som är ansluten till med hjälp av den my_linked_service länkade tjänsten och visa de första 10 raderna med data:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Anta att containern my-container används för att lagra objekt som liknar följande exempel:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

Utdata från PySpark-koden skulle likna följande tabell:

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Data läses in från analysarkivet i containern, inte från driftlagret. se till att det inte finns några frågekostnader i driftarkivet. Fälten i analysdatalagret innehåller de programdefinierade fälten (i det här fallet productID och productName) och automatiskt skapade metadatafält.

När du har läst in dataramen kan du använda dess interna metoder för att utforska data. Följande kod skapar till exempel en ny dataram som endast innehåller kolumnerna productID och productName , ordnade efter productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

Utdata från den här koden skulle se ut ungefär så här:

productID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Skriva en dataram till en Cosmos DB-container

I de flesta HTAP-scenarier bör du använda den länkade tjänsten för att läsa in data i Spark från analysarkivet. Du kan dock skriva innehållet i en dataram till containern enligt följande exempel:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Kommentar

När du skriver en dataram till en container uppdateras driftlagret och dess prestanda påverkas. Ändringarna synkroniseras sedan till analysarkivet.

Använda Spark SQL för att köra frågor mot Azure Cosmos DB-analysdata

Spark SQL är ett Spark-API som tillhandahåller SQL-språksyntax och relationsdatabassemantik i en Spark-pool. Du kan använda Spark SQL för att definiera metadata för tabeller som kan efterfrågas med hjälp av SQL.

Följande kod skapar till exempel en tabell med namnet Products baserat på den hypotetiska container som användes i föregående exempel:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Dricks

Nyckelordet %%sql i början av koden är en magi som instruerar Spark-poolen att köra koden som SQL i stället för standardspråket (som vanligtvis är inställt på PySpark).

Med den här metoden kan du skapa en logisk databas i Spark-poolen som du sedan kan använda för att köra frågor mot analysdata i Azure Cosmos DB för att stödja dataanalys och rapportering av arbetsbelastningar utan att påverka driftlagret i ditt Azure Cosmos DB-konto.