你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用 Apache Spark 将数据从 Azure Cosmos DB 复制到专用 SQL 池

Azure Synapse Link for Azure Cosmos DB 使用户能够对 Azure Cosmos DB 中的操作数据运行准实时分析。 但是,有时需要聚合和丰富某些数据,以便为数据仓库用户提供服务。 只需通过笔记本中的几个单元格即可策展和导出 Azure Synapse Link 数据。

先决条件

步骤

在本教程中,你将连接到分析存储,因此不会对事务存储产生任何影响(它不会消耗任何请求单位)。 我们将执行以下步骤:

  1. 将 Azure Cosmos DB HTAP 容器读入 Spark 数据帧
  2. 在新数据帧中聚合结果
  3. 将数据引入专用 SQL 池中

从 Spark 到 SQL 的步骤 1

数据

在该示例中,我们使用名为“RetailSales”的 HTAP 容器。 它属于名为“ConnectedData”的链接服务,具有以下架构:

  • _rid: string (nullable = true)
  • _ts: long (nullable = true)
  • logQuantity: double (nullable = true)
  • productCode: string (nullable = true)
  • quantity: long (nullable = true)
  • price: long (nullable = true)
  • id: string (nullable = true)
  • advertising: long (nullable = true)
  • storeId: long (nullable = true)
  • weekStarting: long (nullable = true)
  • _etag: string (nullable = true)

为了进行报告,我们将按“productCode”和“weekStarting”聚合销售额(数量、收入 [价格 x 数量]) 。 最后,我们会将该数据导入名为 dbo.productsales 的专用 SQL 池表。

配置 Spark 笔记本

创建一个 Spark 笔记本,以 Scala as Spark (Scala) 作为主要语言。 使用笔记本的默认会话设置。

读取 Spark 中的数据

在第一个单元格中,使用 Spark 将 Azure Cosmos DB HTAP 容器读取到数据帧中。

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

在新数据帧中聚合结果

在第二个单元格中,在将新数据帧加载到专用 SQL 池数据库之前,先运行它所需的转换和聚合。

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

将结果加载到专用 SQL 池中

在第三个单元格中,将数据加载到专用 SQL 池中。 它将自动创建一个临时外部表、外部数据源和外部文件格式,作业完成后会删除这些内容。

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

通过 SQL 查询结果

可以使用简单的 SQL 查询(例如以下 SQL 脚本)查询结果:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

查询会以图表模式显示以下结果:从 Spark 到 SQL 的步骤 2

后续步骤