Поделиться через


Руководство по обратному извлечению, преобразованию и загрузке (ETL) из Delta Lake в Azure Cosmos DB для NoSQL с соединителем Spark OLTP

В этом руководстве вы будете настраивать обратный конвейер ETL для перемещения обогащенных данных из таблиц Delta в Azure Databricks в Azure Cosmos DB для NoSQL. Затем вы используете соединитель Spark для OLTP (онлайновой обработки транзакций) в Azure Cosmos DB для базы данных NoSQL, чтобы синхронизировать данные.

Предварительные требования для настройки конвейера обратной обработки ETL

Настройка управления доступом на основе ролей с помощью Microsoft Entra

Управляемые удостоверения Azure обеспечивают безопасную проверку подлинности без пароля в Azure Cosmos DB для NoSQL без ручного управления учетными данными. На этом предварительном этапе необходимо настроить пользовательское управляемое удостоверение, которое Azure Databricks автоматически создает с правами на чтение метаданных и правами на запись данных для вашей учетной записи Azure Cosmos DB для NoSQL. Этот шаг настраивает роли управления доступом на основе ролей как для плоскости управления, так и для плоскости данных управляемого удостоверения.

  1. Войдите на портал Azure (https://portal.azure.com).

  2. Перейдите к существующему ресурсу Azure Databricks.

  3. На панели Essentials найдите и перейдите к управляемой группе ресурсов, связанной с рабочей областью.

  4. В группе управляемых ресурсов выберите управляемую идентичность, назначенную пользователем, которая была автоматически создана вместе с рабочей областью.

  5. Запишите значение полей идентификатора клиента и идентификатора объекта (субъекта) на панели Essentials . Это значение используется позже для назначения ролей управления и плоскости данных.

    Подсказка

    Кроме того, можно получить идентификатор субъекта управляемого удостоверения с помощью Azure CLI. Предположим, что имя управляемого удостоверения — dbmanagedidentity, используйте команду az resource show, чтобы получить идентификатор субъекта.

    az resource show \
        --resource-group "<name-of-managed-resource-group>" \
        --name "dbmanagedidentity" \
        --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \
        --query "{clientId: properties.clientId, principalId: properties.principalId}"
    
  6. Перейдите к целевой учетной записи Azure Cosmos DB для NoSQL.

  7. На странице учетной записи выберите Управление доступом (IAM).

  8. В области управления доступом выберите "Добавить", а затем "Добавить назначение роли", чтобы начать процесс назначения роли на уровне управления управляемому удостоверению, назначенному пользователем.

  9. Выберите роль читателя учетной записи Cosmos DB в списке ролей для назначения.

  10. В разделе, посвященном назначению доступа к пользователю, группе или субъекту-службе , взаимодействуют с параметром выбора участников .

  11. В диалоговом окне участников введите идентификатор субъекта для фильтрации управляемого удостоверения, назначаемого пользователем, связанного с Azure Databricks. Выберите эту личность.

  12. Наконец, нажмите кнопку "Рецензирование" и "Назначить ", чтобы создать назначение роли уровня управления.

  13. Используйте команду az cosmosdb sql role assignment create, чтобы назначить роль плоскости данных Cosmos DB Built-in Data Contributor и область / управляемой идентификации от пользователя, связанной с Azure Databricks.

    az cosmosdb sql role assignment create \
        --resource-group "<name-of-resource-group>" \
        --account-name "<name-of-cosmos-nosql-account>" \
        --principal-id "<managed-identity-principal-id>" \
        --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"
    
  14. Используется az account show для получения идентификаторов подписки и клиента. Эти значения необходимы на более позднем шаге с соединителем Spark с помощью аутентификации Microsoft Entra.

    az account show --query '{subscriptionId: id, tenantId: tenantId}'
    

Создайте записную книжку Databricks

  1. Перейдите к существующему ресурсу Azure Databricks и откройте пользовательский интерфейс рабочей области.

  2. Если у вас еще нет кластера, создайте новый кластер.

    Это важно

    Убедитесь, что кластер имеет Runtime версии 15.4 или выше, которая имеет долгосрочную поддержку для Spark 3.5.0 и Scala 2.12. Оставшиеся шаги в этом руководстве рассчитаны на эти версии инструментов.

  3. Перейдите к библиотекам>, установите новый> и Maven пакет, чтобы установить пакет Maven.

  4. Выполните поиск соединителя Spark для Azure Cosmos DB для NoSQL, используя фильтр идентификатора группы, и выберите пакет с com.azure.cosmos.spark.

  5. Создайте новую записную книжку, выбрав Рабочая область>[папка]>Новое>Записная книжка.

  6. Подключите записную книжку к кластеру.

Настройка соединителя Spark в Azure Databricks

Настройте соединитель Spark для подключения к контейнеру учетной записи с помощью проверки подлинности Microsoft Entra. Кроме того, настройте соединитель только для использования ограниченного порогового значения пропускной способности для операций Spark. Чтобы настроить соединитель Spark, определите словарь конфигурации с учетными данными для подключения к учетной записи. Эти учетные данные включают:

Ценность
spark.cosmos.accountEndpoint Конечная точка учетной записи NoSQL
spark.cosmos.database название целевой базы данных
spark.cosmos.container Имя целевого контейнера
spark.cosmos.auth.type ManagedIdentity
spark.cosmos.auth.aad.clientId Идентификатор клиента управляемой удостоверенности, назначенной пользователем
spark.cosmos.account.subscriptionId Идентификатор подписки
spark.cosmos.account.tenantId Идентификатор связанного клиента Microsoft Entra
spark.cosmos.account.resourceGroupName Имя группы ресурсов
spark.cosmos.throughputControl.enabled true
spark.cosmos.throughputControl.name TargetContainerThroughputControl
spark.cosmos.throughputControl.targetThroughputThreshold 0.30
spark.cosmos.throughputControl.globalControl.useDedicatedContainer ложь
cosmos_config = {
    # General settings
    "spark.cosmos.accountEndpoint": "<endpoint>",
    "spark.cosmos.database": "products",
    "spark.cosmos.container": "recommendations",
    # Entra authentication settings
    "spark.cosmos.auth.type": "ManagedIdentity",
    "spark.cosmos.account.subscriptionId": "<subscriptionId>",
    "spark.cosmos.account.tenantId": "<tenantId>",
    "spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
    # Throughput control settings
    "spark.cosmos.throughputControl.enabled": "true",
    "spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
    "spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
  // General settings
  "spark.cosmos.accountEndpoint" -> "<endpoint>",
  "spark.cosmos.database" -> "products",
  "spark.cosmos.container" -> "recommendations",
  // Entra authentication settings
  "spark.cosmos.auth.type" -> "ManagedIdentity",
  "spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
  "spark.cosmos.account.tenantId" -> "<tenantId>",
  "spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
  // Throughput control settings
  "spark.cosmos.throughputControl.enabled" -> "true",
  "spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
  "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
  "spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)

Замечание

В этом примере целевая база данных называется products , а целевой контейнер называется recommendations.

Конфигурация пропускной способности, указанная на этом шаге, гарантирует, что для операций Spark доступно только 30% единиц запросов, выделенных целевому контейнеру.

Загрузка данных с примерами рекомендаций по продукту в таблицу Delta

Создайте пример DataFrame с информацией о рекомендациях продуктов для пользователей и запишите его в таблицу Delta с именем recommendations_delta. Этот шаг имитирует курированные преобразованные данные в озере данных, которое планируется синхронизировать с Azure Cosmos DB для NoSQL. Запись в формате Delta обеспечивает возможность последующего включения фиксации изменений данных (CDC) для инкрементной синхронизации.

from pyspark.sql import SparkSession

# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
    ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
    ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])

# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
  ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
  ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore") 

// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")

Начальная пакетная загрузка данных в Azure Cosmos DB для NoSQL

Затем прочитайте таблицу recommendations_delta Delta в DataFrame Spark и выполните начальную загрузку данных в пакетном режиме в Azure Cosmos DB для NoSQL с использованием cosmos.oltp формата. Используйте режим добавления , чтобы добавить данные без перезаписи существующего содержимого в целевой базе данных и контейнере. Этот шаг гарантирует, что все исторические данные доступны в учетной записи до начала CDC.

# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")

# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")

// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()

Включение стриминговой синхронизации с потоком данных об изменениях

Чтобы включить функцию канала измененных данных (CDF) в Delta Lake, измените свойства таблицы recommendations_delta. CDF позволяет Delta Lake отслеживать все будущие вставки, обновления и удаления на уровне строк. Включение этого свойства важно для выполнения добавочной синхронизации с Azure Cosmos DB для NoSQL, так как оно предоставляет изменения без необходимости сравнивать моментальные снимки.

После загрузки исторических данных изменения в таблице Delta можно зафиксировать с помощью Delta Change Data Feed (CDF). Вы можете реализовать CDC на основе пакетной обработки или потоковой передачи данных.

# Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()

Проверка данных с помощью запросов NoSQL

После записи в Azure Cosmos DB для NoSQL проверьте данные, запросив их обратно в Spark, используя ту же конфигурацию учетной записи. Тогда проверьте принятые данные, выполните проверки данных или объедините с другими наборами данных в Delta Lake для аналитики или отчетности. Azure Cosmos DB для NoSQL поддерживает быстрые индексированные операции чтения для производительности запросов в режиме реального времени.

# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()

# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()

// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()