使用 Apache Spark 將資料從 Azure Cosmos DB 複製到專用 SQL 集區

適用於 Azure Cosmos DB 的 Synapse Link 可讓使用者對 Azure Cosmos DB 中的操作資料執行近乎即時的分析。 不過,有時候有些資料需要進行彙總和擴充,才能服務資料倉儲使用者。 只需透過筆記本的幾個資料格即可策展和匯出 Azure Synapse Link 的資料。

先決條件

步驟

在本教學課程中,您將連線到分析存放區,因此不會影響交易存放區 (其不會取用任何要求單位)。 我們會進行下列步驟:

  1. 將 Azure Cosmos DB HTAP 容器讀入 Spark dataframe 中
  2. 在新的資料框架中彙總結果
  3. 將資料內嵌至專用 SQL 集區

Spark 至 SQL 的步驟 1

資料

在該範例中,我們使用名為 RetailSales 的 HTAP 容器。 此容器是名為 ConnectedData 的連結服務一部分,而且具有下列結構描述:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

我們會依據 productCodeweekStarting 來彙總銷售額 (數量收益 (價格 x 數量)) 以供報告之用。 最後,我們會將該資料匯出到名為 dbo.productsales 的專用 SQL 集區資料表。

設定 Spark 筆記本

使用 Scala as Spark (Scala) 作為主要語言來建立 Spark 筆記本。 我們會針對工作階段使用筆記本的預設設定。

讀取 Spark 中的資料

使用 Spark 將 Azure Cosmos DB HTAP 容器讀入第一個儲存格的 dataframe 內。

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

在新的資料框架中彙總結果

在第二個資料格中,我們會先執行新資料框架所需的轉換和彙總,然後再將其載入至專用 SQL 集區資料庫。

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

將結果載入到專用 SQL 集區

在第三個資料格中,我們會將資料載入到專用 SQL 集區。 其會自動建立暫存的外部資料表、外部資料來源和外部檔案格式,並於作業完成後立即刪除。

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

使用 SQL 查詢結果

您可以使用簡單的 SQL 查詢 (如下列 SQL 指令碼) 來查詢結果:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

您的查詢會以圖表模式呈現下列結果:Spark 至 SQL 步驟 2

後續步驟