共用方式為


在 Microsoft Fabric 中,使用 Cosmos DB Spark 連接器來操作 Cosmos DB

你可以使用 Spark 和 Azure Cosmos DB Spark 連接器,從 Azure Cosmos DB for NoSQL 帳號讀取、寫入和查詢資料。 此外,你也可以用這個連接器來建立和管理 Cosmos DB 容器。

使用 Spark 和連接器與使用 Spark 從 OneLake 中 Cosmos DB 的 Fabric 鏡像資料讀取資料不同,因為 Spark 是直接連接到 Cosmos DB 端點執行操作。

Cosmos DB Spark 連接器可用於支援反向 ETL 情境,需要以低延遲或高並發性從 Cosmos DB 端點提供資料。

備註

反向 ETL(擷取、轉換、載入)是指將分析系統中轉換後的分析資料重新載入營運系統(如 CRM、ERP、POS 或行銷工具)的過程,讓業務團隊能直接在日常使用的應用程式中根據洞察採取行動。

先決條件

備註

本文使用內建的 Cosmos DB 範例,資料庫名稱為 CosmosSampleDatabase ,容器名稱為 SampleData

取回 Cosmos DB 端點

首先,取得 Fabric 中 Cosmos DB 資料庫的端點。 此端點必須使用 Cosmos DB Spark 連接器連接。

  1. 開啟 Fabric 入口網站 (https://app.fabric.microsoft.com)。

  2. 流覽至您現有的 Cosmos DB 資料庫。

  3. 在資料庫選單列中選擇 設定 選項。

    Fabric 入口網站中資料庫「設定」選單欄選項的截圖。

  4. 在設定對話框中,進入 連線 區塊。 接著,複製 Endpoint for Cosmos DB NoSQL 資料庫 欄位的值。 這個數值會在後面的步驟使用。

    Fabric 入口網站中資料庫「設定」對話框中「連線」區段的截圖。

在 Fabric 筆記本中設定 Spark

要使用 Spark 連接器連接 Cosmos DB,你需要設定一個自訂的 Spark 環境。 本節將引導您如何建立自訂 Spark 環境並上傳 Cosmos DB Spark 連接器函式庫。

  1. 從 Maven 倉庫(群組 ID:com.azure.cosmos.spark)下載 Spark 3.5 的最新 Cosmos DB Spark 連接器函式庫檔案。

  2. 建立新的筆記本。

  3. 選擇 Spark(Scala)作為你想使用的語言。

    筆記本截圖顯示選擇 Spark(Scala)作為首選語言。

  4. 選擇環境下拉選單。

  5. 檢查你的工作區設定,確保你使用的是執行環境 1.3(Spark 3.5)。

    筆記本截圖顯示工作區設定的下拉選單。

  6. 選擇新增環境

  7. 請為這個新環境取個名字。

  8. 確保運行階段已設定為 Runtime 1.3(Spark 3.5)。

  9. 從左側面板的 Libraries 資料夾選擇自訂函式庫

    顯示自訂函式庫選項的環境截圖。

  10. 上傳你之前下載的兩個函式庫 .jar 檔案。

  11. 選取 [儲存]。

  12. 選擇 發佈,然後選擇 全部發佈,最後選擇 發佈

  13. 一旦發佈,自訂函式庫應該會保持成功狀態。

    包含自訂函式庫檔案的環境截圖。

  14. 回到筆記本,點擊環境下拉選單,選擇 變更環境,並選擇新建立環境名稱,選擇新設定的環境。

使用 Spark 進行連線

要在 Fabric 資料庫和容器中連接你的 Cosmos DB,請指定一個連接設定,以便在讀取和寫入容器時使用。

  1. 在筆記本中,貼上你之前保留的 Cosmos DB 端點、資料庫和容器名稱,然後為 NoSQL 帳號端點、資料庫名稱和容器名稱設定線上交易處理(OLTP)設定。

    // User values for Cosmos DB
    val ENDPOINT = "https://{YourAccountEndpoint....cosmos.fabric.microsoft.com:443/}"
    val DATABASE = "{your-cosmos-artifact-name}"
    val CONTAINER = "{your-container-name}"
    
    // Set configuration settings
    val config = Map(
          "spark.cosmos.accountendpoint" -> ENDPOINT,
          "spark.cosmos.database" -> DATABASE,
          "spark.cosmos.container" -> CONTAINER,
          // auth config options
          "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver",
          "spark.cosmos.auth.type" -> "AccessToken",
          "spark.cosmos.useGatewayMode" -> "true",
          "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/"
    )
    

從容器查詢資料

將 OLTP 資料載入 DataFrame 以執行一些基本的 Spark 操作。

  1. spark.read 來將 OLTP 資料載入 DataFrame 物件。 請使用前一步建立的設定。 此外,將 spark.cosmos.read.inferSchema.enabled 設為 true,以允許 Spark 連接器透過對現有項目取樣來推斷結構描述。

    // Read Cosmos DB container into a dataframe
    val df = spark.read.format("cosmos.oltp")
      .options(config)
      .option("spark.cosmos.read.inferSchema.enabled", "true")
      .load()
    
  2. 在 DataFrame 中顯示前五列資料。

    // Show the first 5 rows of the dataframe
    df.show(5)
    

    備註

    你之前建立的 SampleData 容器包含兩個不同的實體,分別是 產品審查。 inferSchema 選項會偵測 Cosmos DB 容器內的兩個不同結構並將它們合併。

  3. 顯示載入 DataFrame 的資料結構,透過使用 printSchema 並確認結構與範例文件結構相符。

    // Render schema    
    df.printSchema()
    

    結果應該與以下範例相似:

    
        root
         |-- inventory: integer (nullable = true)
         |-- name: string (nullable = true)
         |-- priceHistory: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- date: string (nullable = true)
         |    |    |-- price: double (nullable = true)
         |-- stars: integer (nullable = true)
         |-- description: string (nullable = true)
         |-- currentPrice: double (nullable = true)
         |-- reviewDate: string (nullable = true)
         |-- countryOfOrigin: string (nullable = true)
         |-- id: string (nullable = false)
         |-- categoryName: string (nullable = true)
         |-- productId: string (nullable = true)
         |-- firstAvailable: string (nullable = true)
         |-- userName: string (nullable = true)
         |-- docType: string (nullable = true)
    
  4. 這兩個結構及其資料可以透過容器中的 docType 屬性進行過濾。 使用 where 函式來篩選僅包含商品的 DataFrame。

    // Render filtered rows by specific document type
    val productsDF = df.where("docType = 'product'")
    productsDF.show(5)
    
  5. 顯示 篩選產品 實體的結構圖。

    // Render schema    
    productsDF.printSchema()
    
  6. 利用函 filter 式過濾 DataFrame,只顯示特定類別內的產品。

    // Render filtered rows by specific document type and categoryName
    val filteredDF = df
    .where("docType = 'product'")
    .filter($"categoryName" === "Computers, Laptops")
    
    filteredDF.show(10)
    

使用 Spark SQL 查詢 Microsoft Fabric 中的 Cosmos DB

  1. 設定目錄 API,讓你能透過 Spark 查詢,使用先前定義的端點值,在 Fabric 中參考和管理 Cosmos DB 資源。

    spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", ENDPOINT)
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.auth.type", "AccessToken")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.useGatewayMode", "true")
    spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountDataResolverServiceName", "com.azure.cosmos.spark.fabric.FabricAccountDataResolver")
    
  2. 用目錄資訊和 SQL 查詢字串,搭配 Spark SQL 函式來查詢資料。

     // Show results of query   
      val queryDF = spark.sql(
      " SELECT " + 
      "  categoryName, " + 
      "  productId, " + 
      "  docType, " + 
      "  name, " + 
      "  currentPrice, " + 
      "  stars " + 
      " FROM cosmosCatalog." + DATABASE + "." + CONTAINER
      )
      queryDF.show(10)
    

    結果顯示,個別文件中缺少的屬性會以 NULL 值回傳,並應類似以下範例:

    +------------------+--------------------+-------+--------------------+------------+-----+
    |      categoryName|           productId|docType|                name|currentPrice|stars|
    +------------------+--------------------+-------+--------------------+------------+-----+
    |Computers, Laptops|77be013f-4036-431...|product|TechCorp SwiftEdg...|     2655.33| NULL|
    |Computers, Laptops|77be013f-4036-431...| review|                NULL|        NULL|    4|
    |Computers, Laptops|77be013f-4036-431...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...|product|AeroTech VortexBo...|     2497.71| NULL|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    2|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    1|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    2|
    |Computers, Laptops|d4df3f4e-5a90-41e...| review|                NULL|        NULL|    5|
    |Computers, Laptops|e8b100f0-166d-43d...|product|NovaTech EdgeBook...|     1387.45| NULL|
    +------------------+--------------------+-------+--------------------+------------+-----+
    
  3. 這個範例展示了如何在 Cosmos DB 中儲存的 JSON 文件中,使用嵌入陣列來操作。 首先,查詢容器,然後使用運算子 explode 將陣列元素展開 priceHistory 成列,然後計算每個產品在產品歷史中儲存的最低價格。

    // Retrieve the product data from the SampleData container
    val productPriceMinDF = spark.sql(
    "SELECT " +
    "  productId, " +
    "  categoryName, " +
    "  name, " +
    "  currentPrice, " +
    "  priceHistory " +
    "FROM cosmosCatalog." + DATABASE + "." + CONTAINER + " " +
    "WHERE " + CONTAINER + ".docType = 'product'"
    )
    
    // Prepare an exploded result set containing one row for every member of the priceHistory array
    val explodedDF = productPriceMinDF
       .withColumn("priceHistory", explode(col("priceHistory")))
       .withColumn("priceDate", col("priceHistory").getField("date"))
       .withColumn("newPrice", col("priceHistory").getField("price"))
    
    // Aggregate just the lowest price ever recorded in the priceHistory
    val lowestPriceDF = explodedDF
       .filter(col("docType") === "product")
       .groupBy("productId", "categoryName", "name")
       .agg(min("newPrice").as("lowestPrice"))
    
    // Show 10 rows of the result data
    lowestPriceDF.show(10)
    

    結果應該是這樣。

       +--------------------+--------------------+--------------------+-----------+
       |           productId|        categoryName|                name|lowestPrice|
       +--------------------+--------------------+--------------------+-----------+
       |5d81221f-79ad-4ae...|Accessories, High...|PulseCharge Pro X120|      79.99|
       |9173595c-2b5c-488...|Accessories, Desi...| Elevate ProStand X2|     117.16|
       |a5d1be8f-ef18-484...|Computers, Gaming...|VoltStream Enigma...|     1799.0|
       |c9e3a6ce-432f-496...|Peripherals, Keyb...|HyperKey Pro X77 ...|     117.12|
       |f786eb9e-de01-45f...|    Devices, Tablets|TechVerse TabPro X12|     469.93|
       |59f21059-e9d4-492...|Peripherals, Moni...|GenericGenericPix...|     309.77|
       |074d2d7a-933e-464...|Devices, Smartwat...|  PulseSync Orion X7|     170.43|
       |dba39ca4-f94a-4b6...|Accessories, Desi...|Elevate ProStand ...|      129.0|
       |4775c430-1470-401...|Peripherals, Micr...|EchoStream Pro X7...|     119.65|
       |459a191a-21d1-42f...|Computers, Workst...|VertexPro Ultima ...|     3750.4|
       +--------------------+--------------------+--------------------+-----------+
    

使用 Cosmos DB 實作 Spark 的反向 ETL

Cosmos DB 因其架構特性,是分析工作負載中卓越的服務層。 以下範例展示了如何對分析資料執行反向 ETL 並使用 Cosmos DB 來提供資料。

用 Spark 在 Fabric 容器中建立 Cosmos 資料庫

  • 使用 Spark Catalog API 和CREATE TABLE IF NOT EXISTS建立MinPricePerProduct容器。 將分割鍵路徑設為 /id,並將自動擴展吞吐量設定為最小 1000 的 RU/s,因為容器預期會保持小型。

    // Create a MinPricePerProduct container by using the Catalog API
      val NEW_CONTAINER = "MinPricePerProduct"
    
      spark.sql(
      "CREATE TABLE IF NOT EXISTS cosmosCatalog." + DATABASE + "." + NEW_CONTAINER + " " +
      "USING cosmos.oltp " + 
      "TBLPROPERTIES(partitionKeyPath = '/id', autoScaleMaxThroughput = '1000')"
      )
    

用 Spark 在 Fabric 容器中寫入 Cosmos 資料庫

若要直接在 Fabric 容器中寫入 Cosmos 資料庫,您需要:

  • 一個格式正確的資料框架,包含容器分割鍵與 id 欄位。
  • 一個正確指定、針對你想寫入的容器的設定。
  1. Cosmos DB 中的所有文件都需要 id 屬性,這也是容器所選的 Products 分割鍵。 在 DataFrame 上建立idProductsDF一個欄位,值為 productId

    // Create an id column and copy productId value into it
    val ProductsDF = lowestPriceDF.withColumn("id", col("productId"))
    ProductsDF.show(10)
    
  2. 為您要寫入的 MinPricePerProduct 容器建立一個新的配置。 將 spark.cosmos.write.strategy 設為 ItemOverwrite,表示任何具有相同 ID 與分割鍵值的現有文件都會被覆寫。

    // Configure the Cosmos DB connection information for the database and the new container.
    val configWrite = Map(
       "spark.cosmos.accountendpoint" -> ENDPOINT,
       "spark.cosmos.database" -> DATABASE,
       "spark.cosmos.container" -> NEW_CONTAINER,
       "spark.cosmos.write.strategy" -> "ItemOverwrite",
       // auth config options
       "spark.cosmos.accountDataResolverServiceName" -> "com.azure.cosmos.spark.fabric.FabricAccountDataResolver",
       "spark.cosmos.auth.type" -> "AccessToken",
       "spark.cosmos.useGatewayMode" -> "true",
       "spark.cosmos.auth.aad.audience" -> "https://cosmos.azure.com/"
    )
    
  3. 把 DataFrame 寫入容器。

    ProductsDF.write
      .format("cosmos.oltp")
      .options(configWrite)
      .mode("APPEND")
      .save()
    
  4. 查詢容器以驗證它現在包含正確的資料。

    // Test that the write operation worked
    val queryString = s"SELECT * FROM cosmosCatalog.$DATABASE.$NEW_CONTAINER"
    val queryDF = spark.sql(queryString)
    queryDF.show(10)
    

    結果應該與以下範例相似:

       +--------------------+--------------------+-----------+--------------------+--------------------+
       |                name|        categoryName|lowestPrice|                  id|           productId|
       +--------------------+--------------------+-----------+--------------------+--------------------+
       |PulseCharge Pro X120|Accessories, High...|      79.99|5d81221f-79ad-4ae...|5d81221f-79ad-4ae...|
       | Elevate ProStand X2|Accessories, Desi...|     117.16|9173595c-2b5c-488...|9173595c-2b5c-488...|
       |VoltStream Enigma...|Computers, Gaming...|     1799.0|a5d1be8f-ef18-484...|a5d1be8f-ef18-484...|
       |HyperKey Pro X77 ...|Peripherals, Keyb...|     117.12|c9e3a6ce-432f-496...|c9e3a6ce-432f-496...|
       |TechVerse TabPro X12|    Devices, Tablets|     469.93|f786eb9e-de01-45f...|f786eb9e-de01-45f...|
       |GenericGenericPix...|Peripherals, Moni...|     309.77|59f21059-e9d4-492...|59f21059-e9d4-492...|
       |  PulseSync Orion X7|Devices, Smartwat...|     170.43|074d2d7a-933e-464...|074d2d7a-933e-464...|
       |Elevate ProStand ...|Accessories, Desi...|      129.0|dba39ca4-f94a-4b6...|dba39ca4-f94a-4b6...|
       |EchoStream Pro X7...|Peripherals, Micr...|     119.65|4775c430-1470-401...|4775c430-1470-401...|
       |VertexPro Ultima ...|Computers, Workst...|     3750.4|459a191a-21d1-42f...|459a191a-21d1-42f...|
       +--------------------+--------------------+-----------+--------------------+--------------------+