Share via


Copiare dati da Azure Cosmos DB in un pool SQL dedicato con Apache Spark

Collegamento ad Azure Synapse per Azure Cosmos DB consente agli utenti di eseguire analisi quasi in tempo reale su dati operativi in Azure Cosmos DB. Talvolta, tuttavia, è necessario aggregare e arricchire alcuni dati per gli utenti dei data warehouse. Per curare ed esportare i dati di Collegamento ad Azure Synapse sono sufficienti poche celle in un notebook.

Prerequisiti

Passaggi

In questa esercitazione si eseguirà la connessione all'archivio analitico, in modo da evitare qualsiasi impatto sull'archivio transazionale. Non verranno utilizzate unità richiesta. Si eseguiranno i passaggi seguenti:

  1. Leggere il contenitore HTAP di Azure Cosmos DB in un dataframe Spark
  2. Aggregare i risultati in un nuovo dataframe
  3. Inserire i dati in un pool SQL dedicato

Spark to SQL Steps 1

Dati

Nell'esempio si usa un contenitore HTAP denominato RetailSales, che fa parte di un servizio collegato denominato ConnectedData e presenta lo schema seguente:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

Si aggregheranno le vendite (quantity, revenue (price x quantity) per productCode e weekStarting a scopo di report. Infine si esporteranno i dati in una tabella del pool SQL dedicato denominata dbo.productsales.

Configurare un notebook Spark

Creare un notebook Spark con Scala (Scala as Spark) come linguaggio principale. Per la sessione si usa l'impostazione predefinita del notebook.

Leggere i dati in Spark

Nella prima cella leggere il contenitore HTAP di Azure Cosmos DB con Spark in un dataframe.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

Aggregare i risultati in un nuovo dataframe

Nella seconda cella si eseguono la trasformazione e le aggregazioni necessarie per il nuovo dataframe prima di caricarlo in un database del pool SQL dedicato.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

Caricare i risultati in un pool SQL dedicato

Nella terza cella si caricano i dati in un pool SQL dedicato. Verranno creati automaticamente una tabella esterna, un'origine dati esterna e un formato di file esterno temporanei che verranno eliminati al termine del processo.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

Eseguire query sui risultati con SQL

È possibile eseguire query sul risultato usando una semplice query SQL, ad esempio lo script SQL seguente:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

La query presenterà i risultati seguenti in modalità grafico: Spark to SQL Steps 2

Passaggi successivi