Kopiera data från Azure Cosmos DB till en dedikerad SQL-pool med Apache Spark

Azure Synapse Link för Azure Cosmos DB gör det möjligt för användare att köra analyser i nära realtid över driftdata i Azure Cosmos DB. Det finns dock tillfällen då vissa data måste aggregeras och utökas för att betjäna datalageranvändare. Du kan kurera och exportera Azure Synapse Link-data med bara några få celler i en notebook-fil.

Förutsättningar

Steg

I den här självstudien ansluter du till analysarkivet så att det inte påverkar transaktionslagret (det förbrukar inga enheter för programbegäran). Vi går igenom följande steg:

  1. Läs Azure Cosmos DB HTAP-containern i en Spark-dataram
  2. Aggregera resultatet i en ny dataram
  3. Mata in data i en dedikerad SQL-pool

Spark till SQL Steg 1

Data

I det exemplet använder vi en HTAP-container med namnet RetailSales. Den ingår i en länkad tjänst med namnet ConnectedData och har följande schema:

  • _rid: sträng (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: sträng (nullable = true)
  • reklam: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: sträng (nullable = true)

Vi aggregerar försäljningen (kvantitet, intäkt (pris x kvantitet) efter productCode och weekStarting i rapporteringssyfte. Slutligen exporterar vi dessa data till en dedikerad SQL-pooltabell med namnet dbo.productsales.

Konfigurera en Spark Notebook

Skapa en Spark-anteckningsbok med Scala som Spark (Scala) som huvudspråk. Vi använder notebook-filens standardinställning för sessionen.

Läsa data i Spark

Läs Azure Cosmos DB HTAP-containern med Spark i en dataram i den första cellen.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Aggregera resultatet i en ny dataram

I den andra cellen kör vi den transformering och de aggregeringar som behövs för den nya dataramen innan vi läser in den i en dedikerad SQL-pooldatabas.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Läs in resultaten i en dedikerad SQL-pool

I den tredje cellen läser vi in data i en dedikerad SQL-pool. Den skapar automatiskt en tillfällig extern tabell, en extern datakälla och ett externt filformat som tas bort när jobbet är klart.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Fråga resultatet med SQL

Du kan köra frågor mot resultatet med hjälp av en enkel SQL-fråga, till exempel följande SQL-skript:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

Frågan visar följande resultat i diagramläge: Spark till SQL Steg 2

Nästa steg