Freigeben über


Tutorial: Reverse-ETL (Extract, Transform and Load) von Delta Lake zu Azure Cosmos DB for NoSQL mit Spark OLTP-Connector

In diesem Lernprogramm richten Sie eine reverse ETL-Pipeline ein, um angereicherte Daten aus Delta-Tabellen in Azure Databricks nach Azure Cosmos DB für NoSQL zu verschieben. Anschließend verwenden Sie den OLTP-Spark-Connector (Online Transaction Processing) für Azure Cosmos DB für NoSQL, um Daten zu synchronisieren.

Voraussetzungen für Reverse ETL-Pipeline-Setup

Konfigurieren der rollenbasierten Zugriffssteuerung mit Microsoft Entra

Von Azure verwaltete Identitäten stellen eine sichere, kennwortlose Authentifizierung für Azure Cosmos DB für NoSQL sicher, ohne Anmeldeinformationen manuell zu verwalten. Richten Sie in diesem erforderlichen Schritt die benutzerseitig zugewiesene verwaltete Identität ein, die Azure Databricks automatisch mit Lesezugriff auf Metadaten und Schreibzugriff auf Daten für Ihr Azure Cosmos DB for NoSQL-Konto erstellt. In diesem Schritt werden rollenbasierte Zugriffssteuerungsrollen für die verwaltete Identität für die Steuerungsebene und die Datenebene konfiguriert.

  1. Melden Sie sich beim Azure-Portal (https://portal.azure.com) an.

  2. Navigieren Sie zur vorhandenen Azure Databricks-Ressource.

  3. Suchen und navigieren Sie im Bereich "Essentials " zur verwalteten Ressourcengruppe, die dem Arbeitsbereich zugeordnet ist.

  4. Wählen Sie in der Gruppe verwaltete Ressourcen die vom Benutzer zugewiesene verwaltete Identität aus, die automatisch mit dem Arbeitsbereich erstellt wurde.

  5. Notieren Sie den Wert der Felder "Client-ID " und " Objekt(Prinzipal)" im Bereich "Essentials ". Sie verwenden diesen Wert später, um Steuer- und Datenebenenrollen zuzuweisen.

    Tipp

    Alternativ können Sie die Prinzipal-ID der verwalteten Identität mithilfe der Azure CLI abrufen. Wenn der Name der verwalteten Identität lautet dbmanagedidentity, verwenden Sie den az resource show Befehl, um die Prinzipal-ID abzurufen.

    az resource show \
        --resource-group "<name-of-managed-resource-group>" \
        --name "dbmanagedidentity" \
        --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \
        --query "{clientId: properties.clientId, principalId: properties.principalId}"
    
  6. Navigieren Sie zum Zielkonto für Azure Cosmos DB für NoSQL.

  7. Wählen Sie auf der Seite des Kontos zugriffssteuerung (IAM) aus.

  8. Wählen Sie im Access-Steuerelementbereich die Option "Hinzufügen" und dann die Optionen zum Hinzufügen von Rollenzuweisungen aus, um mit dem Vorgang zu beginnen, eine Steuerelementebenenrolle der vom Benutzer zugewiesenen verwalteten Identität zuzuweisen.

  9. Wählen Sie die Rolle "Cosmos DB Account Reader " in der Liste der Rollen für die Zuweisung aus.

  10. Im Abschnitt zum Zuweisen des Zugriffs auf einen Benutzer, eine Gruppe oder einen Dienstprinzipal interagieren Sie mit der Option "Ausgewählte Mitglieder ".

  11. Geben Sie im Dialogfeld "Mitglieder" die Prinzipal-ID ein, um nach der vom Benutzer zugewiesenen verwalteten Identität zu filtern, die Azure Databricks zugeordnet ist. Wählen Sie diese Identität aus.

  12. Wählen Sie schließlich "Überprüfen+ Zuweisen " aus, um die Rollenzuweisung der Steuerebene zu erstellen.

  13. Verwenden Sie den az cosmosdb sql role assignment create Befehl, um die Rolle der Cosmos DB Built-in Data Contributor Datenebene und den / Bereich der vom Benutzer zugewiesenen verwalteten Identität zuzuweisen, die Azure Databricks zugeordnet ist.

    az cosmosdb sql role assignment create \
        --resource-group "<name-of-resource-group>" \
        --account-name "<name-of-cosmos-nosql-account>" \
        --principal-id "<managed-identity-principal-id>" \
        --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"
    
  14. Verwenden Sie az account show, um Ihre Abonnement- und Mandanten-IDs abzurufen. Diese Werte sind in einem späteren Schritt mit dem Spark-Connector mithilfe der Microsoft Entra-Authentifizierung erforderlich.

    az account show --query '{subscriptionId: id, tenantId: tenantId}'
    

Erstellen eines Databricks-Notizbuchs

  1. Navigieren Sie zu der vorhandenen Azure Databricks-Ressource, und öffnen Sie dann die Arbeitsbereichsbenutzeroberfläche.

  2. Wenn Sie noch keinen Cluster haben, erstellen Sie einen neuen Cluster.

    Von Bedeutung

    Stellen Sie sicher, dass der Cluster über die Runtime-Version 15.4 mit langfristiger Unterstützung für Spark 3.5.0 und Scala 2.12 verfügt. Bei den verbleibenden Schritten in diesem Handbuch wird davon ausgegangen, dass diese Versionen der Tools verwendet werden.

  3. Navigieren Sie zu "Libraries>Install New> " und "Maven ", um ein Maven-Paket zu installieren.

  4. Suchen Sie mithilfe des Gruppen-ID-Filterscom.azure.cosmos.spark nach dem Spark-Connector für Azure Cosmos DB für NoSQL, und wählen Sie das Paket mit einer Artefakt-ID von azure-cosmos-spark_3-5_2-12.

  5. Erstellen Sie ein neues Notizbuch, indem Sie zu "Arbeitsbereich>[Ordner]>Neues>Notizbuch" navigieren.

  6. Fügen Sie das Notizbuch an Ihren Cluster an.

Konfigurieren des Spark-Connectors in Azure Databricks

Konfigurieren Sie den Spark-Connector, um mithilfe der Microsoft Entra-Authentifizierung eine Verbindung mit dem Container Ihres Kontos herzustellen. Konfigurieren Sie außerdem den Connector so, dass nur ein begrenzter Durchsatzschwellenwert für Spark-Vorgänge verwendet wird. Um den Spark Connector zu konfigurieren, definieren Sie ein Konfigurationswörterbuch mit Anmeldeinformationen, um eine Verbindung mit Ihrem Konto herzustellen. Zu diesen Anmeldeinformationen gehören:

Wert
spark.cosmos.accountEndpoint Der NoSQL-Kontoendpunkt
spark.cosmos.database Der Name der Zieldatenbank
spark.cosmos.container Der Name des Zielcontainers
spark.cosmos.auth.type ManagedIdentity
spark.cosmos.auth.aad.clientId Die Client-ID der vom Benutzer zugewiesenen verwalteten Identität
spark.cosmos.account.subscriptionId Die ID des Abonnements
spark.cosmos.account.tenantId Die ID des zugeordneten Microsoft Entra-Mandanten
spark.cosmos.account.resourceGroupName Der Name der Ressourcengruppe
spark.cosmos.throughputControl.enabled true
spark.cosmos.throughputControl.name TargetContainerThroughputControl
spark.cosmos.throughputControl.targetThroughputThreshold 0.30
spark.cosmos.throughputControl.globalControl.useDedicatedContainer FALSE
cosmos_config = {
    # General settings
    "spark.cosmos.accountEndpoint": "<endpoint>",
    "spark.cosmos.database": "products",
    "spark.cosmos.container": "recommendations",
    # Entra authentication settings
    "spark.cosmos.auth.type": "ManagedIdentity",
    "spark.cosmos.account.subscriptionId": "<subscriptionId>",
    "spark.cosmos.account.tenantId": "<tenantId>",
    "spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
    # Throughput control settings
    "spark.cosmos.throughputControl.enabled": "true",
    "spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
    "spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
  // General settings
  "spark.cosmos.accountEndpoint" -> "<endpoint>",
  "spark.cosmos.database" -> "products",
  "spark.cosmos.container" -> "recommendations",
  // Entra authentication settings
  "spark.cosmos.auth.type" -> "ManagedIdentity",
  "spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
  "spark.cosmos.account.tenantId" -> "<tenantId>",
  "spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
  // Throughput control settings
  "spark.cosmos.throughputControl.enabled" -> "true",
  "spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
  "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
  "spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)

Hinweis

In diesem Beispiel wird die Zieldatenbank benannt products , und der Zielcontainer wird benannt recommendations.

Die in diesem Schritt angegebene Durchsatzkonfiguration stellt sicher, dass nur 30% der Anforderungseinheiten (RUs), die dem Zielcontainer zugeordnet sind, für Spark-Vorgänge verfügbar sind.

Nehmen Sie Empfehlungsdaten für Beispielprodukte in eine Delta-Tabelle auf

Erstellen Sie ein DataFrame-Beispiel mit Produktempfehlungeninformationen für Benutzer, und schreiben Sie ihn in eine Delta-Tabelle mit dem Namen recommendations_delta. In diesem Schritt werden kuratierte, transformierte Daten in Ihrem Datensee simuliert, die Sie mit Azure Cosmos DB für NoSQL synchronisieren möchten. Das Schreiben in das Delta-Format stellt sicher, dass Sie später die Änderungsdatenerfassung (CDC) für die inkrementelle Synchronisierung aktivieren können.

from pyspark.sql import SparkSession

# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
    ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
    ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])

# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
  ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
  ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore") 

// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")

Batchladen der Anfangsdaten in Azure Cosmos DB in einer NoSQL-Umgebung

Lesen Sie als Nächstes die recommendations_delta Delta-Tabelle in einen Spark DataFrame und führen Sie mit dem cosmos.oltp Format einen ersten Batchschreibvorgang in Azure Cosmos DB für NoSQL durch. Verwenden Sie den Anfügemodus , um die Daten hinzuzufügen, ohne vorhandene Inhalte in der Zieldatenbank und im Container zu überschreiben. Dieser Schritt stellt sicher, dass alle historischen Daten im Konto verfügbar sind, bevor CDC beginnt.

# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")

# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")

// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()

Aktivieren der Streamingsynchronisierung mit Änderungsdatenfeed

Aktivieren Sie die Funktion "Change Data Feed (CDF) von Delta Lake" in der recommendations_delta Tabelle, indem Sie die Eigenschaften der Tabelle ändern. CDF ermöglicht Es Delta Lake, alle zukünftigen Einfügungen, Aktualisierungen und Löschungen auf Zeilenebene nachzuverfolgen. Das Aktivieren dieser Eigenschaft ist für das Ausführen inkrementeller Synchronisierungen mit Azure Cosmos DB für NoSQL unerlässlich, da sie Änderungen verfügbar macht, ohne Momentaufnahmen vergleichen zu müssen.

Nach dem Laden der historischen Daten können Änderungen in der Delta-Tabelle mithilfe des Delta-Änderungsdatenfeeds (DELTA Change Data Feed, CDF) erfasst werden. Sie können entweder batchbasiertes oder streamingbasiertes CDC implementieren.

# Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()

Überprüfen von Daten mithilfe von NoSQL-Abfragen

Überprüfen Sie nach dem Schreiben in Azure Cosmos DB für NoSQL die Daten, indem Sie sie mithilfe derselben Kontokonfiguration wieder in Spark abfragen. Dann überprüfen Sie die aufgenommenen Daten, führen Sie Validierungen durch oder verknüpfen Sie die Datensätze im Delta Lake für Analysen oder Berichterstellung. Azure Cosmos DB für NoSQL unterstützt schnelle, indizierte Lesevorgänge für die Echtzeitabfrageleistung.

# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()

# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()

// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()