使用 Apache Spark 將資料從 Azure Cosmos DB 複製到專用 SQL 集區
適用於 Azure Cosmos DB 的 Synapse Link 可讓使用者對 Azure Cosmos DB 中的操作資料執行近乎即時的分析。 不過,有時候有些資料需要進行彙總和擴充,才能服務資料倉儲使用者。 只需透過筆記本的幾個資料格即可策展和匯出 Azure Synapse Link 的資料。
先決條件
- 使用下列項目佈建 Synapse 工作區:
- 以具有資料的 HTAP 容器佈建 Azure Cosmos DB 帳戶
- 將 Azure Cosmos DB HTAP 容器連線到工作區
- 進行正確設定以將資料從 Spark 匯入到專用 SQL 集區
步驟
在本教學課程中,您將連線到分析存放區,因此不會影響交易存放區 (其不會取用任何要求單位)。 我們會進行下列步驟:
- 將 Azure Cosmos DB HTAP 容器讀入 Spark dataframe 中
- 在新的資料框架中彙總結果
- 將資料內嵌至專用 SQL 集區
資料
在該範例中,我們使用名為 RetailSales 的 HTAP 容器。 此容器是名為 ConnectedData 的連結服務一部分,而且具有下列結構描述:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
我們會依據 productCode 和 weekStarting 來彙總銷售額 (數量、收益 (價格 x 數量)) 以供報告之用。 最後,我們會將該資料匯出到名為 dbo.productsales
的專用 SQL 集區資料表。
設定 Spark 筆記本
使用 Scala as Spark (Scala) 作為主要語言來建立 Spark 筆記本。 我們會針對工作階段使用筆記本的預設設定。
讀取 Spark 中的資料
使用 Spark 將 Azure Cosmos DB HTAP 容器讀入第一個儲存格的 dataframe 內。
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
在新的資料框架中彙總結果
在第二個資料格中,我們會先執行新資料框架所需的轉換和彙總,然後再將其載入至專用 SQL 集區資料庫。
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
將結果載入到專用 SQL 集區
在第三個資料格中,我們會將資料載入到專用 SQL 集區。 其會自動建立暫存的外部資料表、外部資料來源和外部檔案格式,並於作業完成後立即刪除。
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
使用 SQL 查詢結果
您可以使用簡單的 SQL 查詢 (如下列 SQL 指令碼) 來查詢結果:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]