在 Microsoft Fabric 中使用 Cosmos DB Spark 连接器与 Cosmos DB 一起工作

可以使用 Spark 和 Azure Cosmos DB Spark 连接器从 Azure Cosmos DB for NoSQL 帐户读取、写入和查询数据。 此外,可以使用连接器创建和管理 Cosmos DB 容器。

使用 Spark 和连接器直接连接到 Cosmos DB 终结点执行操作,这与使用 Spark 从 OneLake 中存储的 Fabric 镜像数据中读取 Cosmos DB 的数据不同。

Cosmos DB Spark 连接器可用于支持反向 ETL 方案,需要在低延迟或高并发的情况下为 Cosmos DB 终结点中的数据提供服务。

注释

反向 ETL(提取、转换、加载)是指从分析系统获取转换的分析数据并将其加载回作系统(如 CRM、ERP、POS 或营销工具)的过程,以便业务团队可以直接在每天使用的应用程序中处理见解。

先决条件

  • 包含数据的现有容器

注释

本文使用名称为CosmosSampleDatabase的数据库和名称为SampleData的容器创建的内置 Cosmos DB 示例。

检索 Cosmos DB 终结点

首先,获取 Fabric 中 Cosmos DB 数据库的终结点。 若要使用 Cosmos DB Spark 连接器进行连接,需要使用此终结点。

  1. 打开 Fabric 门户(https://app.fabric.microsoft.com)。

  2. 导航到现有的 Cosmos DB 数据库。

  3. 在数据库的菜单栏中选择 “设置” 选项。

    Fabric 门户中数据库的“设置”菜单栏选项的屏幕截图。

  4. 在“设置”对话框中,导航到 “连接” 部分。 然后,复制 Cosmos DB NoSQL 数据库 字段的 Endpoint 值。 在后面的步骤中使用此值。

    Fabric 门户中数据库的“设置”对话框的“连接”部分的屏幕截图。

在 Fabric 笔记本中配置 Spark

若要使用 Spark 连接器连接到 Cosmos DB,需要配置自定义 Spark 环境。 本部分介绍如何创建自定义 Spark 环境并上传 Cosmos DB Spark 连接器库。

  1. 从 Maven 存储库(组 ID:com.azure.cosmos.spark)下载适用于 Spark 3.5 的最新 Cosmos DB Spark 连接器库文件。

  2. 创建新的笔记本。

  3. 选择 Spark(Scala)作为要使用的语言。

    笔记本的屏幕截图,其中显示了选择 Spark(Scala)作为首选语言。

  4. 选择“环境”下拉列表。

  5. 检查工作区设置,确保使用的是运行时 1.3(Spark 3.5)。

    显示工作区设置下拉菜单的笔记本的屏幕截图。

  6. 选择“新建环境”。

  7. 输入新环境的名称。

  8. 确保将运行时配置为 1.3(Spark 3.5)。

  9. 从左侧面板中的“库”文件夹中选择“自定义库”。

    显示自定义库选项的环境屏幕截图。

  10. 上传之前下载的两个库 .jar 文件。

  11. 选择“保存”

  12. 选择 “发布”,然后选择“ 全部发布”,最后 发布

  13. 发布后,自定义库应具有成功状态。

    已提交自定义库文件的环境的屏幕截图。

  14. 返回笔记本,单击环境下拉菜单,选择 “更改环境”,然后选中新配置环境的名称。

使用 Spark 进行连接

若要连接到 Fabric 数据库和容器中的 Cosmos DB,请指定从容器读取和写入容器时要使用的连接配置。

  1. 在笔记本中,粘贴之前保留的 Cosmos DB 终结点、数据库和容器名称,然后为 NoSQL 帐户终结点、数据库名称和容器名称设置联机事务处理(OLTP)配置设置。

    // User values for Cosmos DB
    val ENDPOINT = "https://{YourAccountEndpoint....cosmos.fabric.microsoft.com:443/}"
    val DATABASE = "{your-cosmos-artifact-name}"
    val CONTAINER = "{your-container-name}"
    
    // Set configuration settings
    val config = Map(
          "spark.cosmos.accountendpoint" -> ENDPOINT,
          "spark.cosmos.database" -> DATABASE,
          "spark.cosmos.container" -> CONTAINER,
          // auth config options
          "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver",
          "spark.cosmos.auth.type" -> "AccessToken",
          "spark.cosmos.useGatewayMode" -> "true",
          "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/"
    )
    

从容器查询数据

将 OLTP 数据加载到 DataFrame 中以执行一些基本的 Spark 操作。

  1. 用于 spark.read 将 OLTP 数据加载到 DataFrame 对象中。 使用在上一步中创建的配置。 此外,将 spark.cosmos.read.inferSchema.enabled 设置为 true,以允许 Spark 连接器通过采样现有项来推断架构。

    // Read Cosmos DB container into a dataframe
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. 在 DataFrame 中显示前五行数据。

    // Show the first 5 rows of the dataframe
    df.show(5)
    

    注释

    前面创建的 SampleData 容器包含两个不同的实体,其中包含两个单独的架构、产品和评审。 inferSchema 选项可检测此 Cosmos DB 容器中的两个不同的架构并将其合并。

  3. 显示通过使用 printSchema 数据帧加载的数据的架构,并确保架构与示例文档结构匹配。

    // Render schema    
    df.printSchema()
    

    结果应类似于以下示例:

    
        root
         |-- inventory: integer (nullable = true)
         |-- name: string (nullable = true)
         |-- priceHistory: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- date: string (nullable = true)
         |    |    |-- price: double (nullable = true)
         |-- stars: integer (nullable = true)
         |-- description: string (nullable = true)
         |-- currentPrice: double (nullable = true)
         |-- reviewDate: string (nullable = true)
         |-- countryOfOrigin: string (nullable = true)
         |-- id: string (nullable = false)
         |-- categoryName: string (nullable = true)
         |-- productId: string (nullable = true)
         |-- firstAvailable: string (nullable = true)
         |-- userName: string (nullable = true)
         |-- docType: string (nullable = true)
    
  4. 可以使用容器中的 docType 属性筛选这两个架构及其数据。 使用 where 函数仅筛选数据帧中的产品。

    // Render filtered rows by specific document type
    val productsDF = df.where("docType = 'product'")
    productsDF.show(5)
    
  5. 显示筛选后的产品实体的模式。

    // Render schema    
    productsDF.printSchema()
    
  6. 使用 filter 函数筛选数据帧以仅显示特定类别中的产品。

    // Render filtered rows by specific document type and categoryName
    val filteredDF = df
    .where("docType = 'product'")
    .filter($"categoryName" === "Computers, Laptops")
    
    filteredDF.show(10)
    

使用 Spark SQL 在 Microsoft Fabric 中查询 Cosmos DB

  1. 配置目录 API,以便使用前面定义的终结点值,使用 Spark 查询在 Fabric 资源中引用和管理 Cosmos DB。

    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", ENDPOINT)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "AccessToken")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.useGatewayMode", "true")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountDataResolverServiceName", "com.azure.cosmos.spark.fabric.FabricAccountDataResolver")
    
  2. 通过 Spark SQL 函数使用目录信息和 SQL 查询字符串查询数据。

     // Show results of query   
      val queryDF = spark.sql(
      " SELECT " + 
      "  categoryName, " + 
      "  productId, " + 
      "  docType, " + 
      "  name, " + 
      "  currentPrice, " + 
      "  stars " + 
      " FROM cosmosCatalog." + DATABASE + "." + CONTAINER
      )
      queryDF.show(10)
    

    结果显示,单个文档缺少的属性将作为 NULL 值返回,应类似于以下示例:

    +------------------+--------------------+-------+--------------------+------------+-----+
    |      categoryName|           productId|docType|                name|currentPrice|stars|
    +------------------+--------------------+-------+--------------------+------------+-----+
    |Computers, Laptops|77be013f-4036-431...|product|TechCorp SwiftEdg...|     2655.33| NULL|
    |Computers, Laptops|77be013f-4036-431...| review|                NULL|        NULL|    4|
    |Computers, Laptops|77be013f-4036-431...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...|product|AeroTech VortexBo...|     2497.71| NULL|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    2|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    2|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    5|
    |Computers, Laptops|e8b100f0-166d-43d...|product|NovaTech EdgeBook...|     1387.45| NULL|
    +------------------+--------------------+-------+--------------------+------------+-----+
    
  3. 此示例演示如何在 Cosmos DB 中存储的 JSON 文档中处理嵌入的数组。 首先,查询容器,然后使用 explode 运算符将 priceHistory 数组元素扩展到行中,然后计算存储在产品历史记录中的每个产品的最低价格。

    // Retrieve the product data from the SampleData container
    val productPriceMinDF = spark.sql(
    "SELECT " +
    "  productId, " +
    "  categoryName, " +
    "  name, " +
    "  currentPrice, " +
    "  priceHistory " +
    "FROM cosmosCatalog." + DATABASE + "." + CONTAINER + " " +
    "WHERE " + CONTAINER + ".docType = 'product'"
    )
    
    // Prepare an exploded result set containing one row for every member of the priceHistory array
    val explodedDF = productPriceMinDF
       .withColumn("priceHistory", explode(col("priceHistory")))
       .withColumn("priceDate", col("priceHistory").getField("date"))
       .withColumn("newPrice", col("priceHistory").getField("price"))
    
    // Aggregate just the lowest price ever recorded in the priceHistory
    val lowestPriceDF = explodedDF
       .filter(col("docType") === "product")
       .groupBy("productId", "categoryName", "name")
       .agg(min("newPrice").as("lowestPrice"))
    
    // Show 10 rows of the result data
    lowestPriceDF.show(10)
    

    结果应如下所示。

       +--------------------+--------------------+--------------------+-----------+
       |           productId|        categoryName|                name|lowestPrice|
       +--------------------+--------------------+--------------------+-----------+
       |5d81221f-79ad-4ae...|Accessories, High...|PulseCharge Pro X120|      79.99|
       |9173595c-2b5c-488...|Accessories, Desi...| Elevate ProStand X2|     117.16|
       |a5d1be8f-ef18-484...|Computers, Gaming...|VoltStream Enigma...|     1799.0|
       |c9e3a6ce-432f-496...|Peripherals, Keyb...|HyperKey Pro X77 ...|     117.12|
       |f786eb9e-de01-45f...|    Devices, Tablets|TechVerse TabPro X12|     469.93|
       |59f21059-e9d4-492...|Peripherals, Moni...|GenericGenericPix...|     309.77|
       |074d2d7a-933e-464...|Devices, Smartwat...|  PulseSync Orion X7|     170.43|
       |dba39ca4-f94a-4b6...|Accessories, Desi...|Elevate ProStand ...|      129.0|
       |4775c430-1470-401...|Peripherals, Micr...|EchoStream Pro X7...|     119.65|
       |459a191a-21d1-42f...|Computers, Workst...|VertexPro Ultima ...|     3750.4|
       +--------------------+--------------------+--------------------+-----------+
    

使用 Cosmos DB 通过 Spark 实现反向 ETL

Cosmos DB 是由于其体系结构而成为出色的分析工作负载服务层。 以下示例演示如何对分析数据执行反向 ETL 并使用 Cosmos DB 提供服务。

使用 Spark 在 Fabric 容器中创建 Cosmos DB

  • 创建一个 MinPricePerProduct 容器,使用 Spark 目录 API 和 CREATE TABLE IF NOT EXISTS。 将分区键路径设置为 /id,并将自动缩放吞吐量配置为最小值 1000 RU/s,因为预期容器将保持较小规模。

    // Create a MinPricePerProduct container by using the Catalog API
      val NEW_CONTAINER = "MinPricePerProduct"
    
      spark.sql(
      "CREATE TABLE IF NOT EXISTS cosmosCatalog." + DATABASE + "." + NEW_CONTAINER + " " +
      "USING cosmos.oltp " + 
      "TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '1000')"
      )
    

使用 Spark 将数据写入 Fabric 容器中的 Cosmos DB

若要将数据直接写入 Fabric 容器中的 Cosmos DB,需要:

  • 一个格式正确的数据帧,其中包含容器分区键和 id 列。
  • 为所需写入的容器正确地指定配置。
  1. Cosmos DB 中的所有文档都需要 ID 属性,这也是为容器选择的 Products 分区键。 在ProductsDF DataFrame 上创建一个值为productIdid列。

    // Create an id column and copy productId value into it
    val ProductsDF = lowestPriceDF.withColumn("id", col("productId"))
    ProductsDF.show(10)
    
  2. 为要写入的 MinPricePerProduct 容器创建新配置。 spark.cosmos.write.strategy 被设置为 ItemOverwrite值,这意味着将会覆盖具有相同 ID 和分区键的值的任何现有文档。

    // Configure the Cosmos DB connection information for the database and the new container.
    val configWrite = Map(
       "spark.cosmos.accountendpoint" -> ENDPOINT,
       "spark.cosmos.database" -> DATABASE,
       "spark.cosmos.container" -> NEW_CONTAINER,
       "spark.cosmos.write.strategy" -> "ItemOverwrite",
       // auth config options
       "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver",
       "spark.cosmos.auth.type" -> "AccessToken",
       "spark.cosmos.useGatewayMode" -> "true",
       "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/"
    )
    
  3. 将 DataFrame 写入容器。

    ProductsDF.write
      .format("cosmos.oltp")
      .options(configWrite)
      .mode("APPEND")
      .save()
    
  4. 查询容器以验证它现在是否包含正确的数据。

    // Test that the write operation worked
    val queryString = s"SELECT * FROM cosmosCatalog.$DATABASE.$NEW_CONTAINER"
    val queryDF = spark.sql(queryString)
    queryDF.show(10)
    

    结果应类似于以下示例:

       +--------------------+--------------------+-----------+--------------------+--------------------+
       |                name|        categoryName|lowestPrice|                  id|           productId|
       +--------------------+--------------------+-----------+--------------------+--------------------+
       |PulseCharge Pro X120|Accessories, High...|      79.99|5d81221f-79ad-4ae...|5d81221f-79ad-4ae...|
       | Elevate ProStand X2|Accessories, Desi...|     117.16|9173595c-2b5c-488...|9173595c-2b5c-488...|
       |VoltStream Enigma...|Computers, Gaming...|     1799.0|a5d1be8f-ef18-484...|a5d1be8f-ef18-484...|
       |HyperKey Pro X77 ...|Peripherals, Keyb...|     117.12|c9e3a6ce-432f-496...|c9e3a6ce-432f-496...|
       |TechVerse TabPro X12|    Devices, Tablets|     469.93|f786eb9e-de01-45f...|f786eb9e-de01-45f...|
       |GenericGenericPix...|Peripherals, Moni...|     309.77|59f21059-e9d4-492...|59f21059-e9d4-492...|
       |  PulseSync Orion X7|Devices, Smartwat...|     170.43|074d2d7a-933e-464...|074d2d7a-933e-464...|
       |Elevate ProStand ...|Accessories, Desi...|      129.0|dba39ca4-f94a-4b6...|dba39ca4-f94a-4b6...|
       |EchoStream Pro X7...|Peripherals, Micr...|     119.65|4775c430-1470-401...|4775c430-1470-401...|
       |VertexPro Ultima ...|Computers, Workst...|     3750.4|459a191a-21d1-42f...|459a191a-21d1-42f...|
       +--------------------+--------------------+-----------+--------------------+--------------------+