你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用 Apache Spark 将数据从 Azure Cosmos DB 复制到专用 SQL 池
Azure Synapse Link for Azure Cosmos DB 使用户能够对 Azure Cosmos DB 中的操作数据运行准实时分析。 但是,有时需要聚合和丰富某些数据,以便为数据仓库用户提供服务。 只需通过笔记本中的几个单元格即可策展和导出 Azure Synapse Link 数据。
先决条件
- 为 Synapse 工作区预配以下内容:
- 为 Azure Cosmos DB 帐户预配包含数据的 HTAP 容器
- 将 Azure Cosmos DB HTAP 容器连接到工作区
- 使用正确的设置将数据从 Spark 导入专用 SQL 池
步骤
在本教程中,你将连接到分析存储,因此不会对事务存储产生任何影响(它不会消耗任何请求单位)。 我们将执行以下步骤:
- 将 Azure Cosmos DB HTAP 容器读入 Spark 数据帧
- 在新数据帧中聚合结果
- 将数据引入专用 SQL 池中
数据
在该示例中,我们使用名为“RetailSales”的 HTAP 容器。 它属于名为“ConnectedData”的链接服务,具有以下架构:
- _rid: string (nullable = true)
- _ts: long (nullable = true)
- logQuantity: double (nullable = true)
- productCode: string (nullable = true)
- quantity: long (nullable = true)
- price: long (nullable = true)
- id: string (nullable = true)
- advertising: long (nullable = true)
- storeId: long (nullable = true)
- weekStarting: long (nullable = true)
- _etag: string (nullable = true)
为了进行报告,我们将按“productCode”和“weekStarting”聚合销售额(数量、收入 [价格 x 数量]) 。 最后,我们会将该数据导入名为 dbo.productsales
的专用 SQL 池表。
配置 Spark 笔记本
创建一个 Spark 笔记本,以 Scala as Spark (Scala) 作为主要语言。 使用笔记本的默认会话设置。
读取 Spark 中的数据
在第一个单元格中,使用 Spark 将 Azure Cosmos DB HTAP 容器读取到数据帧中。
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
在新数据帧中聚合结果
在第二个单元格中,在将新数据帧加载到专用 SQL 池数据库之前,先运行它所需的转换和聚合。
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
将结果加载到专用 SQL 池中
在第三个单元格中,将数据加载到专用 SQL 池中。 它将自动创建一个临时外部表、外部数据源和外部文件格式,作业完成后会删除这些内容。
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
通过 SQL 查询结果
可以使用简单的 SQL 查询(例如以下 SQL 脚本)查询结果:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]