分享方式:


教學課程:從 Delta Lake 到 Azure Cosmos DB for NoSQL 的反向擷取、轉換和載入 (ETL) (使用 Spark OLTP 連接器)

在本教學課程中,您會設定反向 ETL 管線,以將擴充的數據從 Azure Databricks 中的 Delta 數據表移至適用於 NoSQL 的 Azure Cosmos DB。 然後使用適用於 NoSQL 的 Azure Cosmos DB 在線事務處理 (OLTP) Spark 連接器來同步處理數據。

反向 ETL 管線設定的必要條件

使用 Microsoft Entra 設定角色型訪問控制

Azure 受控識別可確保適用於 NoSQL 的 Azure Cosmos DB 安全無密碼驗證,而不需要手動管理認證。 在此必要條件步驟中,設定 Azure Databricks 所自動建立、具有中繼資料讀取權限和 Azure Cosmos DB for NoSQL 帳戶資料寫入權限的使用者指派受控識別。 此步驟會為受控識別同時設定控制平面和資料平面的角色型存取控制角色。

  1. 登入 Azure 入口網站 (https://portal.azure.com)。

  2. 流覽至現有的 Azure Databricks 資源。

  3. 在 [ 基本資訊] 窗格中,找出並流覽至與工作區相關聯的受控資源群組。

  4. 在受控資源群組中,選取隨著工作區自動建立的使用者指派的受控身分識別。

  5. 在 [基本資訊] 窗格中,記錄 [用戶端標識符] 和 [主體] 標識符字段的值。 您稍後會使用此值來指派控件和數據平面角色。

    小提示

    或者,您可以使用 Azure CLI 取得受控識別的主體識別碼。 假設受控識別的名稱是 dbmanagedidentity,請使用 az resource show 命令來取得主體標識符。

    az resource show \
        --resource-group "<name-of-managed-resource-group>" \
        --name "dbmanagedidentity" \
        --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \
        --query "{clientId: properties.clientId, principalId: properties.principalId}"
    
  6. 瀏覽至目標 Azure Cosmos DB for NoSQL 帳戶。

  7. 在帳戶的頁面上,選取 [訪問控制][IAM]。

  8. [存取控制] 窗格中,選取 [新增],然後選取 [新增角色指派] 選項,開始將控制平面角色指派給使用者指派的受控識別。

  9. 在指派的角色清單中,選取 Cosmos DB 帳戶讀取者 角色。

  10. 在指派存取權給 使用者、群組或服務主體 的區段中,使用 選取成員 的選項進行互動。

  11. 在成員對話方塊中輸入主體識別碼,以篩選至與 Azure Databricks 相關聯的使用者指派受控識別。 [選取] 該身分識別。

  12. 最後,選取 [ 檢閱 + 指派 ] 以建立控制平面角色指派。

  13. 使用 az cosmosdb sql role assignment create 命令,將 Cosmos DB Built-in Data Contributor 資料平面角色和 / 範圍指派給與 Azure Databricks 相關聯的使用者指派受控識別。

    az cosmosdb sql role assignment create \
        --resource-group "<name-of-resource-group>" \
        --account-name "<name-of-cosmos-nosql-account>" \
        --principal-id "<managed-identity-principal-id>" \
        --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"
    
  14. 使用 az account show 來取得您的訂用帳戶和租用戶標識碼。 在使用 Spark 連接器並利用 Microsoft Entra 驗證的後續步驟中,這些值是必需的。

    az account show --query '{subscriptionId: id, tenantId: tenantId}'
    

建立 Databricks 筆記本

  1. 流覽至現有的 Azure Databricks 資源,然後開啟工作區 UI。

  2. 如果您還沒有叢集,請建立新的叢集。

    這很重要

    確保叢集具有版本 15.4 或更高版本的執行階段,這些版本提供對 Spark 3.5.0 和 Scala 2.12 的長期支援。 本指南中的其餘步驟假設這些工具版本。

  3. 瀏覽至 [程式庫]>[安裝新的]>[Maven] 以安裝 Maven 套件。

  4. 使用 [群組識別碼] 篩選 com.azure.cosmos.spark 並選取 [成品識別碼]azure-cosmos-spark_3-5_2-12 的套件,以搜尋適用於 Azure Cosmos DB for NoSQL 的 Spark 連接器。

  5. 流覽至 [工作區>][資料夾]>[新增>筆記本],以建立新的筆記本

  6. 將筆記本附加至您的叢集。

在 Azure Databricks 中設定 Spark 連接器

設定 Spark 連接器,以使用 Microsoft Entra 驗證連線到帳戶的容器。 此外,將連接器設定為只針對Spark作業使用有限的輸送量閾值。 若要設定 Spark 連接器,請定義包含認證信息的組態字典,以連線到您的帳戶。 這些認證包括:

價值觀
spark.cosmos.accountEndpoint NoSQL 帳戶端點
spark.cosmos.database 目標資料庫的名稱
spark.cosmos.container 目標容器的名稱
spark.cosmos.auth.type ManagedIdentity
spark.cosmos.auth.aad.clientId 使用者指派之受控識別的用戶端識別碼
spark.cosmos.account.subscriptionId 訂用帳戶的標識碼
spark.cosmos.account.tenantId 相關聯 Microsoft Entra 租用戶的識別碼
spark.cosmos.account.resourceGroupName 資源群組的名稱
spark.cosmos.throughputControl.enabled true
spark.cosmos.throughputControl.name TargetContainerThroughputControl
spark.cosmos.throughputControl.targetThroughputThreshold 0.30
spark.cosmos.throughputControl.globalControl.useDedicatedContainer `false
cosmos_config = {
    # General settings
    "spark.cosmos.accountEndpoint": "<endpoint>",
    "spark.cosmos.database": "products",
    "spark.cosmos.container": "recommendations",
    # Entra authentication settings
    "spark.cosmos.auth.type": "ManagedIdentity",
    "spark.cosmos.account.subscriptionId": "<subscriptionId>",
    "spark.cosmos.account.tenantId": "<tenantId>",
    "spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
    # Throughput control settings
    "spark.cosmos.throughputControl.enabled": "true",
    "spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
    "spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
  // General settings
  "spark.cosmos.accountEndpoint" -> "<endpoint>",
  "spark.cosmos.database" -> "products",
  "spark.cosmos.container" -> "recommendations",
  // Entra authentication settings
  "spark.cosmos.auth.type" -> "ManagedIdentity",
  "spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
  "spark.cosmos.account.tenantId" -> "<tenantId>",
  "spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
  // Throughput control settings
  "spark.cosmos.throughputControl.enabled" -> "true",
  "spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
  "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
  "spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)

備註

在這裡範例中,目標資料庫的名稱為 products ,而目標容器的名稱為 recommendations

此步驟中指定的輸送量設定可確保配置給目標容器的要求單位 (RU) 只有 30% 可供 Spark 作業使用。

將範例產品建議數據內嵌至 Delta 數據表

建立範例 DataFrame,其中包含使用者的產品建議資訊,並將其寫入名為 recommendations_delta的 Delta 數據表中。 此步驟會模擬資料湖中您打算同步處理至 Azure Cosmos DB for NoSQL 的已策劃並轉換的資料。 寫入 Delta 格式可確保您稍後可以啟用異動數據擷取 (CDC) 以進行增量同步處理。

from pyspark.sql import SparkSession

# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
    ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
    ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])

# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
  ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
  ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore") 

// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")

將初始數據批次載入至適用於 NoSQL 的 Azure Cosmos DB

接下來,使用 recommendations_delta 格式,將 Delta 數據表讀入 Spark DataFrame,並執行初始批次寫入至適用於 NoSQL cosmos.oltp 的 Azure Cosmos DB。 使用 附加 模式新增數據,而不覆寫目標資料庫和容器中的現有內容。 此步驟可確保在 CDC 開始之前,帳戶中提供所有歷程記錄數據。

# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")

# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")

// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()

啟用與變更資料摘要的串流同步處理

藉由改變資料表的屬性,可在 recommendations_delta 資料表上啟用 Delta Lake 的變更資料摘要 (CDF) 功能。 CDF 可讓 Delta Lake 追蹤所有未來的資料列層級插入、更新和刪除。 啟用此屬性對於對 NoSQL 的 Azure Cosmos DB 執行累加同步非常重要,因為它會公開變更,而不需要比較快照集。

在歷史資料載入後,便可以使用差異變更資料摘要 (CDF) 擷取差異資料表中的變更。 您可以實作批次式或串流式 CDC。

# Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()

使用 NoSQL 查詢驗證數據

寫入至適用於 NoSQL 的 Azure Cosmos DB 之後,請使用相同的帳戶組態,將它查詢回 Spark 以確認數據。 然後;檢查內嵌的數據、執行驗證,或與 Delta Lake 中的其他數據集聯結以進行分析或報告。 適用於 NoSQL 的 Azure Cosmos DB 支援快速且編製索引的讀取,以達到實時查詢效能。

# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()

# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()

// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()