Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans ce tutoriel, vous allez configurer un pipeline ETL inverse pour déplacer des données enrichies de tables Delta dans Azure Databricks vers Azure Cosmos DB pour NoSQL. Vous utilisez ensuite le connecteur Spark OLTP (Online Transaction Processing) pour Azure Cosmos DB pour NoSQL pour synchroniser les données.
Conditions préalables pour la configuration du pipeline ETL inversée
- Compte Azure Cosmos DB existant.
- Si vous disposez d’un abonnement Azure, créez un compte.
- Un espace de travail Azure Databricks existant.
- Si vous disposez d’un abonnement Azure, créez un espace de travail.
- Dernière version d’Azure CLI.
- Si vous préférez, vous pouvez également utiliser Azure Cloud Shell.
Configurer le contrôle d’accès en fonction du rôle avec Microsoft Entra
Les identités managées Azure garantissent une authentification sécurisée sans mot de passe auprès d’Azure Cosmos DB pour NoSQL sans gérer manuellement les informations d’identification. Dans cette étape requise, configurez l’identité managée assignée par l’utilisateur qu’Azure Databricks crée automatiquement avec l’accès en lecture aux métadonnées et l’accès en écriture aux données pour votre compte Azure Cosmos DB pour NoSQL. Cette étape configure les rôles de contrôle et de contrôle d’accès en fonction du plan de données pour l’identité managée.
Connectez-vous au portail Azure (https://portal.azure.com).
Accédez à la ressource Azure Databricks existante.
Dans le volet Essentials , recherchez et accédez au groupe de ressources managé associé à l’espace de travail.
Dans le groupe de ressources managés, sélectionnez l’identité managée affectée par l’utilisateur qui a été créée automatiquement avec l’espace de travail.
Enregistrez la valeur des champs d’ID client et d’IDd’objet (principal) dans le volet Essentials . Vous utilisez cette valeur ultérieurement pour attribuer des rôles de contrôle et de plan de données.
Conseil / Astuce
Vous pouvez également obtenir l’ID principal de l’identité managée à l’aide d’Azure CLI. En supposant que le nom de l’identité managée est
dbmanagedidentity, utilisez laaz resource showcommande pour obtenir l’ID du principal.az resource show \ --resource-group "<name-of-managed-resource-group>" \ --name "dbmanagedidentity" \ --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \ --query "{clientId: properties.clientId, principalId: properties.principalId}"Accédez au compte Azure Cosmos DB for NoSQL cible.
Dans la page du compte, sélectionnez Contrôle d’accès (IAM) .
Dans le volet Contrôle d’accès , sélectionnez l’option Ajouter , puis ajoutez des options d’attribution de rôle pour commencer le processus d’attribution d’un rôle de plan de contrôle à l’identité managée affectée par l’utilisateur.
Sélectionnez le rôle Lecteur de compte Cosmos DB dans la liste des rôles à attribuer.
Dans la section pour attribuer l’accès à un utilisateur, un groupe ou un principal de service , interagissez avec l’option Sélectionner les membres .
Dans la boîte de dialogue des membres, saisissez l’ID du principal pour filtrer l’identité managée attribuée à l’utilisateur associée à Azure Databricks. Sélectionnez cette identité.
Enfin, sélectionnez Vérifier + Attribuer pour créer l’attribution de rôle de plan de contrôle.
Utilisez la commande
az cosmosdb sql role assignment createpour affecter le rôle de plan de donnéesCosmos DB Built-in Data Contributoret l'étendue/à l'identité managée assignée par l'utilisateur associée à Azure Databricks.az cosmosdb sql role assignment create \ --resource-group "<name-of-resource-group>" \ --account-name "<name-of-cosmos-nosql-account>" \ --principal-id "<managed-identity-principal-id>" \ --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"Utilisez
az account showpour obtenir vos identificateurs d’abonnement et de locataire. Ces valeurs sont requises dans une étape ultérieure avec le connecteur Spark à l’aide de l’authentification Microsoft Entra.az account show --query '{subscriptionId: id, tenantId: tenantId}'
Créer un notebook Databricks
Accédez à la ressource Azure Databricks existante, puis ouvrez l’interface utilisateur de l’espace de travail.
Si vous n’avez pas encore de cluster, créez un cluster.
Important
Vérifiez que le cluster dispose de la version 15.4 d’exécution supérieure, qui prend en charge à long terme Spark 3.5.0 et Scala 2.12. Les étapes restantes de ce guide supposent ces versions des outils.
Pour installer un package Maven, accédez à Bibliothèques>Installer Nouveau>, puis Maven.
Recherchez le connecteur Spark pour Azure Cosmos DB pour NoSQL en utilisant le filtre ID de groupe
com.azure.cosmos.sparket sélectionnez le package qui a un ID d’artefact deazure-cosmos-spark_3-5_2-12.Créez un bloc-notes en accédant à Workspace>[Folder]>New>Notebook.
Attachez le bloc-notes à votre cluster.
Configurer le connecteur Spark dans Azure Databricks
Configurez le connecteur Spark pour vous connecter au conteneur de votre compte à l’aide de l’authentification Microsoft Entra. En outre, configurez le connecteur pour qu’il utilise uniquement un seuil limité de débit pour les opérations Spark. Pour configurer le connecteur Spark, définissez un dictionnaire de configuration avec des informations d’identification pour vous connecter à votre compte. Ces informations d’identification sont les suivantes :
| Valeur | |
|---|---|
spark.cosmos.accountEndpoint |
Point de terminaison du compte NoSQL |
spark.cosmos.database |
Nom de la base de données cible |
spark.cosmos.container |
Nom du conteneur cible |
spark.cosmos.auth.type |
ManagedIdentity |
spark.cosmos.auth.aad.clientId |
ID client de l’identité managée attribuée par l’utilisateur |
spark.cosmos.account.subscriptionId |
ID de l’abonnement |
spark.cosmos.account.tenantId |
L'ID du client Microsoft Entra associé |
spark.cosmos.account.resourceGroupName |
nom du groupe de ressources |
spark.cosmos.throughputControl.enabled |
true |
spark.cosmos.throughputControl.name |
TargetContainerThroughputControl |
spark.cosmos.throughputControl.targetThroughputThreshold |
0.30 |
spark.cosmos.throughputControl.globalControl.useDedicatedContainer |
`false |
cosmos_config = {
# General settings
"spark.cosmos.accountEndpoint": "<endpoint>",
"spark.cosmos.database": "products",
"spark.cosmos.container": "recommendations",
# Entra authentication settings
"spark.cosmos.auth.type": "ManagedIdentity",
"spark.cosmos.account.subscriptionId": "<subscriptionId>",
"spark.cosmos.account.tenantId": "<tenantId>",
"spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
# Throughput control settings
"spark.cosmos.throughputControl.enabled": "true",
"spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
// General settings
"spark.cosmos.accountEndpoint" -> "<endpoint>",
"spark.cosmos.database" -> "products",
"spark.cosmos.container" -> "recommendations",
// Entra authentication settings
"spark.cosmos.auth.type" -> "ManagedIdentity",
"spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
"spark.cosmos.account.tenantId" -> "<tenantId>",
"spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
// Throughput control settings
"spark.cosmos.throughputControl.enabled" -> "true",
"spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
"spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
"spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)
Note
Dans cet exemple, la base de données cible est nommée products et le conteneur cible est nommé recommendations.
La configuration du débit, telle que spécifiée dans cette étape, garantit que seules 30% des unités de requête allouées au conteneur cible sont disponibles pour les opérations Spark.
Ingérer des exemples de données de recommandations de produit dans une table Delta
Créez un exemple de DataFrame avec des informations sur les recommandations de produit pour les utilisateurs et écrivez-le dans une table Delta nommée recommendations_delta. Cette étape simule les données organisées et transformées dans votre lac de données que vous envisagez de synchroniser avec Azure Cosmos DB pour NoSQL. L’écriture au format Delta garantit que vous pouvez activer ultérieurement la capture de données modifiées (CDC) pour la synchronisation incrémentielle.
from pyspark.sql import SparkSession
# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])
# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore")
// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
Chargement par lots de données initiales dans Azure Cosmos DB pour NoSQL
Ensuite, lisez la recommendations_delta table Delta dans un DataFrame Spark et effectuez une écriture par lots initiale dans Azure Cosmos DB pour NoSQL à l’aide du cosmos.oltp format. Utilisez le mode d’ajout pour ajouter les données sans remplacer le contenu existant dans la base de données et le conteneur cible. Cette étape garantit que toutes les données historiques sont disponibles sur le compte avant que la capture des changements de données (CDC) ne commence.
# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")
# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")
// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()
Activer la synchronisation en flux continu avec le flux de données de changement
Activez la fonctionnalité CDF (Change Data Feed) de Delta Lake sur la recommendations_delta table en modifiant les propriétés de la table. CDF permet à Delta Lake de suivre toutes les futures insertions, mises à jour et suppressions au niveau des lignes. L’activation de cette propriété est essentielle pour effectuer des synchronisations incrémentielles avec Azure Cosmos DB pour NoSQL, car elle expose des modifications sans avoir à comparer les instantanés.
Une fois le chargement des données historiques effectué, les modifications apportées à la table Delta peuvent être capturées à l’aide du flux de données modifiées Delta (CDF). Vous pouvez implémenter une CDC basée sur le traitement par lots ou sur la diffusion en continu.
# Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")
// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()
Vérifier les données à l’aide de requêtes NoSQL
Après avoir écrit dans Azure Cosmos DB pour NoSQL, vérifiez les données en les interrogeant dans Spark à l’aide de la même configuration de compte. Alors; inspectez les données ingérées, exécutez des validations ou joignez-vous à d’autres jeux de données dans Delta Lake pour l’analytique ou la création de rapports. Azure Cosmos DB pour NoSQL prend en charge les lectures rapides et indexées pour les performances des requêtes en temps réel.
# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()
# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()
// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()