Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве вы будете настраивать обратный конвейер ETL для перемещения обогащенных данных из таблиц Delta в Azure Databricks в Azure Cosmos DB для NoSQL. Затем вы используете соединитель Spark для OLTP (онлайновой обработки транзакций) в Azure Cosmos DB для базы данных NoSQL, чтобы синхронизировать данные.
Предварительные требования для настройки конвейера обратной обработки ETL
- Существующая учетная запись Azure Cosmos DB.
- Если у вас есть подписка Azure, создайте новую учетную запись.
- Существующая рабочая область Azure Databricks.
- Если у вас есть подписка Azure, создайте новую рабочую область.
- Последняя версия Azure CLI.
- Если вы предпочитаете, вы также можете использовать Azure Cloud Shell.
Настройка управления доступом на основе ролей с помощью Microsoft Entra
Управляемые удостоверения Azure обеспечивают безопасную проверку подлинности без пароля в Azure Cosmos DB для NoSQL без ручного управления учетными данными. На этом предварительном этапе необходимо настроить пользовательское управляемое удостоверение, которое Azure Databricks автоматически создает с правами на чтение метаданных и правами на запись данных для вашей учетной записи Azure Cosmos DB для NoSQL. Этот шаг настраивает роли управления доступом на основе ролей как для плоскости управления, так и для плоскости данных управляемого удостоверения.
Войдите на портал Azure (https://portal.azure.com).
Перейдите к существующему ресурсу Azure Databricks.
На панели Essentials найдите и перейдите к управляемой группе ресурсов, связанной с рабочей областью.
В группе управляемых ресурсов выберите управляемую идентичность, назначенную пользователем, которая была автоматически создана вместе с рабочей областью.
Запишите значение полей идентификатора клиента и идентификатора объекта (субъекта) на панели Essentials . Это значение используется позже для назначения ролей управления и плоскости данных.
Подсказка
Кроме того, можно получить идентификатор субъекта управляемого удостоверения с помощью Azure CLI. Предположим, что имя управляемого удостоверения —
dbmanagedidentity, используйте командуaz resource show, чтобы получить идентификатор субъекта.az resource show \ --resource-group "<name-of-managed-resource-group>" \ --name "dbmanagedidentity" \ --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \ --query "{clientId: properties.clientId, principalId: properties.principalId}"Перейдите к целевой учетной записи Azure Cosmos DB для NoSQL.
На странице учетной записи выберите Управление доступом (IAM).
В области управления доступом выберите "Добавить", а затем "Добавить назначение роли", чтобы начать процесс назначения роли на уровне управления управляемому удостоверению, назначенному пользователем.
Выберите роль читателя учетной записи Cosmos DB в списке ролей для назначения.
В разделе, посвященном назначению доступа к пользователю, группе или субъекту-службе , взаимодействуют с параметром выбора участников .
В диалоговом окне участников введите идентификатор субъекта для фильтрации управляемого удостоверения, назначаемого пользователем, связанного с Azure Databricks. Выберите эту личность.
Наконец, нажмите кнопку "Рецензирование" и "Назначить ", чтобы создать назначение роли уровня управления.
Используйте команду
az cosmosdb sql role assignment create, чтобы назначить роль плоскости данныхCosmos DB Built-in Data Contributorи область/управляемой идентификации от пользователя, связанной с Azure Databricks.az cosmosdb sql role assignment create \ --resource-group "<name-of-resource-group>" \ --account-name "<name-of-cosmos-nosql-account>" \ --principal-id "<managed-identity-principal-id>" \ --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"Используется
az account showдля получения идентификаторов подписки и клиента. Эти значения необходимы на более позднем шаге с соединителем Spark с помощью аутентификации Microsoft Entra.az account show --query '{subscriptionId: id, tenantId: tenantId}'
Создайте записную книжку Databricks
Перейдите к существующему ресурсу Azure Databricks и откройте пользовательский интерфейс рабочей области.
Если у вас еще нет кластера, создайте новый кластер.
Это важно
Убедитесь, что кластер имеет Runtime версии 15.4 или выше, которая имеет долгосрочную поддержку для Spark 3.5.0 и Scala 2.12. Оставшиеся шаги в этом руководстве рассчитаны на эти версии инструментов.
Перейдите к библиотекам>, установите новый> и Maven пакет, чтобы установить пакет Maven.
Выполните поиск соединителя Spark для Azure Cosmos DB для NoSQL, используя фильтр идентификатора группы, и выберите пакет с идентификатором артефакта
azure-cosmos-spark_3-5_2-12.Создайте новую записную книжку, выбрав Рабочая область>[папка]>Новое>Записная книжка.
Подключите записную книжку к кластеру.
Настройка соединителя Spark в Azure Databricks
Настройте соединитель Spark для подключения к контейнеру учетной записи с помощью проверки подлинности Microsoft Entra. Кроме того, настройте соединитель только для использования ограниченного порогового значения пропускной способности для операций Spark. Чтобы настроить соединитель Spark, определите словарь конфигурации с учетными данными для подключения к учетной записи. Эти учетные данные включают:
| Ценность | |
|---|---|
spark.cosmos.accountEndpoint |
Конечная точка учетной записи NoSQL |
spark.cosmos.database |
название целевой базы данных |
spark.cosmos.container |
Имя целевого контейнера |
spark.cosmos.auth.type |
ManagedIdentity |
spark.cosmos.auth.aad.clientId |
Идентификатор клиента управляемой удостоверенности, назначенной пользователем |
spark.cosmos.account.subscriptionId |
Идентификатор подписки |
spark.cosmos.account.tenantId |
Идентификатор связанного клиента Microsoft Entra |
spark.cosmos.account.resourceGroupName |
Имя группы ресурсов |
spark.cosmos.throughputControl.enabled |
true |
spark.cosmos.throughputControl.name |
TargetContainerThroughputControl |
spark.cosmos.throughputControl.targetThroughputThreshold |
0.30 |
spark.cosmos.throughputControl.globalControl.useDedicatedContainer |
ложь |
cosmos_config = {
# General settings
"spark.cosmos.accountEndpoint": "<endpoint>",
"spark.cosmos.database": "products",
"spark.cosmos.container": "recommendations",
# Entra authentication settings
"spark.cosmos.auth.type": "ManagedIdentity",
"spark.cosmos.account.subscriptionId": "<subscriptionId>",
"spark.cosmos.account.tenantId": "<tenantId>",
"spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
# Throughput control settings
"spark.cosmos.throughputControl.enabled": "true",
"spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
// General settings
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.database" -> "products",
"spark.cosmos.container" -> "recommendations",
// Entra authentication settings
"spark.cosmos.auth.type" -> "ManagedIdentity",
"spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
"spark.cosmos.account.tenantId" -> "<tenantId>",
"spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
// Throughput control settings
"spark.cosmos.throughputControl.enabled" -> "true",
"spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)
Замечание
В этом примере целевая база данных называется products , а целевой контейнер называется recommendations.
Конфигурация пропускной способности, указанная на этом шаге, гарантирует, что для операций Spark доступно только 30% единиц запросов, выделенных целевому контейнеру.
Загрузка данных с примерами рекомендаций по продукту в таблицу Delta
Создайте пример DataFrame с информацией о рекомендациях продуктов для пользователей и запишите его в таблицу Delta с именем recommendations_delta. Этот шаг имитирует курированные преобразованные данные в озере данных, которое планируется синхронизировать с Azure Cosmos DB для NoSQL. Запись в формате Delta обеспечивает возможность последующего включения фиксации изменений данных (CDC) для инкрементной синхронизации.
from pyspark.sql import SparkSession
# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])
# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore")
// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
Начальная пакетная загрузка данных в Azure Cosmos DB для NoSQL
Затем прочитайте таблицу recommendations_delta Delta в DataFrame Spark и выполните начальную загрузку данных в пакетном режиме в Azure Cosmos DB для NoSQL с использованием cosmos.oltp формата. Используйте режим добавления , чтобы добавить данные без перезаписи существующего содержимого в целевой базе данных и контейнере. Этот шаг гарантирует, что все исторические данные доступны в учетной записи до начала CDC.
# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")
# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")
// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()
Включение стриминговой синхронизации с потоком данных об изменениях
Чтобы включить функцию канала измененных данных (CDF) в Delta Lake, измените свойства таблицы recommendations_delta. CDF позволяет Delta Lake отслеживать все будущие вставки, обновления и удаления на уровне строк. Включение этого свойства важно для выполнения добавочной синхронизации с Azure Cosmos DB для NoSQL, так как оно предоставляет изменения без необходимости сравнивать моментальные снимки.
После загрузки исторических данных изменения в таблице Delta можно зафиксировать с помощью Delta Change Data Feed (CDF). Вы можете реализовать CDC на основе пакетной обработки или потоковой передачи данных.
# Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()
Проверка данных с помощью запросов NoSQL
После записи в Azure Cosmos DB для NoSQL проверьте данные, запросив их обратно в Spark, используя ту же конфигурацию учетной записи. Тогда проверьте принятые данные, выполните проверки данных или объедините с другими наборами данных в Delta Lake для аналитики или отчетности. Azure Cosmos DB для NoSQL поддерживает быстрые индексированные операции чтения для производительности запросов в режиме реального времени.
# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()
# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()
// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()