Udostępnij przez


Samouczek: Odwrotne wyodrębnianie, przekształcanie i ładowanie (ETL) z Delta Lake do Azure Cosmos DB dla NoSQL za pomocą łącznika Spark OLTP

W tym samouczku skonfigurujesz odwrotny potok ETL, aby przenieść wzbogacone dane z tabel Delta w usłudze Azure Databricks do usługi Azure Cosmos DB for NoSQL. Następnie użyjesz łącznika Spark do przetwarzania transakcji online (OLTP) dla Azure Cosmos DB for NoSQL, aby zsynchronizować dane.

Wymagania wstępne dotyczące konfiguracji odwrotnego procesu ETL

  • Istniejące konto usługi Azure Cosmos DB.
  • Istniejący obszar roboczy usługi Azure Databricks.
  • Najnowsza wersja interfejsu wiersza polecenia platformy Azure.

Konfigurowanie kontroli dostępu opartej na rolach za pomocą usługi Microsoft Entra

Tożsamości zarządzane platformy Azure zapewniają bezpieczne, bezhasłowe uwierzytelnianie do usługi Azure Cosmos DB for NoSQL bez konieczności ręcznego zarządzania poświadczeniami. W tym kroku wstępnym skonfiguruj tożsamość zarządzaną przypisaną przez użytkownika, którą usługa Azure Databricks automatycznie tworzy, zapewniając dostęp do odczytu do metadanych i dostęp do zapisu do danych dla Twojego konta Azure Cosmos DB for NoSQL. Ten krok umożliwia skonfigurowanie ról kontroli dostępu opartej na rolach i płaszczyzny danych dla tożsamości zarządzanej.

  1. Zaloguj się do witryny Azure Portal (https://portal.azure.com).

  2. Przejdź do istniejącego zasobu usługi Azure Databricks.

  3. W okienku Podstawy znajdź zarządzaną grupę zasobów skojarzona z obszarem roboczym i przejdź do niej.

  4. W zarządzanej grupie zasobów wybierz tożsamość zarządzaną przypisaną przez użytkownika, która została automatycznie utworzona za pomocą obszaru roboczego.

  5. Zapisz wartości pól Identyfikator klienta i Identyfikator obiektu (podmiotu zabezpieczeń) w okienku Podstawowe. Ta wartość będzie później używana do przypisywania ról płaszczyzny danych i kontroli.

    Wskazówka

    Alternatywnie możesz uzyskać identyfikator główny tożsamości zarządzanej przy użyciu Azure CLI. Zakładając, że nazwa tożsamości zarządzanej to dbmanagedidentity, użyj polecenia az resource show, aby uzyskać identyfikator podmiotu zabezpieczeń.

    az resource show \
        --resource-group "<name-of-managed-resource-group>" \
        --name "dbmanagedidentity" \
        --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \
        --query "{clientId: properties.clientId, principalId: properties.principalId}"
    
  6. Przejdź do docelowego konta usługi Azure Cosmos DB for NoSQL.

  7. Na stronie konta wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamością).

  8. W okienku Kontrola dostępu wybierz pozycję Dodaj , a następnie opcje Dodaj przypisanie roli , aby rozpocząć proces przypisywania roli płaszczyzny sterowania do tożsamości zarządzanej przypisanej przez użytkownika.

  9. Wybierz z listy ról do przypisania rolę Czytelnik konta usługi Cosmos DB.

  10. W sekcji przypisywanie dostępu do użytkownika, grupy lub jednostki usługi skorzystaj z opcji wybierz członków.

  11. W oknie dialogowym członków wprowadź identyfikator główny, aby przefiltrować do tożsamości zarządzanej przypisanej przez użytkownika, skojarzonej z usługą Azure Databricks. Wybierz tę tożsamość.

  12. Na koniec wybierz pozycję Przejrzyj i przypisz , aby utworzyć przypisanie roli płaszczyzny sterowania.

  13. Użyj polecenia az cosmosdb sql role assignment create do przypisania roli płaszczyzny danych Cosmos DB Built-in Data Contributor oraz zakresu / tożsamości zarządzanej przypisanej przez użytkownika, powiązanej z platformą Azure Databricks.

    az cosmosdb sql role assignment create \
        --resource-group "<name-of-resource-group>" \
        --account-name "<name-of-cosmos-nosql-account>" \
        --principal-id "<managed-identity-principal-id>" \
        --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"
    
  14. Użyj az account show, aby uzyskać identyfikatory subskrypcji i dzierżawy. Te wartości są wymagane w późniejszym kroku z łącznikiem Spark przy użyciu uwierzytelniania Microsoft Entra.

    az account show --query '{subscriptionId: id, tenantId: tenantId}'
    

Utwórz notatnik w usłudze Databricks

  1. Przejdź do istniejącego zasobu usługi Azure Databricks, a następnie otwórz interfejs użytkownika obszaru roboczego.

  2. Jeśli nie masz jeszcze klastra, utwórz nowy klaster.

    Ważne

    Upewnij się, że klaster ma środowisko uruchomieniowe w wersji 15.4 nowszej, która ma długoterminową obsługę platform Spark 3.5.0 i Scala 2.12. W pozostałych krokach w tym przewodniku przyjęto w założeniu użycie tych wersji narzędzi.

  3. Przejdź do Biblioteki>Zainstaluj Nowe> i Maven, aby zainstalować pakiet Maven.

  4. Wyszukaj łącznik Spark dla usługi Azure Cosmos DB dla NoSQL przy użyciu filtru Identyfikator grupy, i wybierz pakiet z com.azure.cosmos.spark.

  5. Utwórz nowy notes, przechodząc do Workspace>[Folder]>Nowy>Notes.

  6. Dołącz notatnik do klastra.

Konfigurowanie łącznika Spark w usłudze Azure Databricks

Skonfiguruj łącznik Spark, aby nawiązać połączenie z kontenerem konta przy użyciu uwierzytelniania Microsoft Entra. Ponadto należy skonfigurować łącznik tak, aby używał tylko ograniczonego progu przepływności dla operacji platformy Spark. Aby skonfigurować konektor Spark, zdefiniuj słownik konfiguracji z poświadczeniami do połączenia z Twoim kontem. Te poświadczenia obejmują:

Wartość
spark.cosmos.accountEndpoint Punkt końcowy konta NoSQL
spark.cosmos.database Nazwa docelowej bazy danych
spark.cosmos.container Nazwa kontenera docelowego
spark.cosmos.auth.type ManagedIdentity
spark.cosmos.auth.aad.clientId Identyfikator klienta tożsamości zarządzanej przypisanej użytkownikowi
spark.cosmos.account.subscriptionId Identyfikator subskrypcji
spark.cosmos.account.tenantId Identyfikator skojarzonego tenanta Microsoft Entra
spark.cosmos.account.resourceGroupName Nazwa grupy zasobów
spark.cosmos.throughputControl.enabled true
spark.cosmos.throughputControl.name TargetContainerThroughputControl
spark.cosmos.throughputControl.targetThroughputThreshold 0.30
spark.cosmos.throughputControl.globalControl.useDedicatedContainer fałsz
cosmos_config = {
    # General settings
    "spark.cosmos.accountEndpoint": "<endpoint>",
    "spark.cosmos.database": "products",
    "spark.cosmos.container": "recommendations",
    # Entra authentication settings
    "spark.cosmos.auth.type": "ManagedIdentity",
    "spark.cosmos.account.subscriptionId": "<subscriptionId>",
    "spark.cosmos.account.tenantId": "<tenantId>",
    "spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
    # Throughput control settings
    "spark.cosmos.throughputControl.enabled": "true",
    "spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
    "spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
  // General settings
  "spark.cosmos.accountEndpoint" -> "<endpoint>",
  "spark.cosmos.database" -> "products",
  "spark.cosmos.container" -> "recommendations",
  // Entra authentication settings
  "spark.cosmos.auth.type" -> "ManagedIdentity",
  "spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
  "spark.cosmos.account.tenantId" -> "<tenantId>",
  "spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
  // Throughput control settings
  "spark.cosmos.throughputControl.enabled" -> "true",
  "spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
  "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
  "spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)

Uwaga / Notatka

W tym przykładzie docelowa baza danych ma nazwę products , a kontener docelowy ma nazwę recommendations.

Konfiguracja przepływności, określona w tym kroku, gwarantuje, że do operacji Spark dostępnych jest tylko 30% jednostek zapytań spośród tych przydzielonych do kontenera docelowego.

Wczytanie przykładowych danych rekomendacji produktów do tabeli Delta

Utwórz przykładową ramkę danych z informacjami o rekomendacjach dotyczących produktów dla użytkowników i zapisz ją w tabeli delty o nazwie recommendations_delta. Ten krok symuluje wyselekcjonowane, przekształcone dane w usłudze Data Lake, które mają być synchronizowane z usługą Azure Cosmos DB for NoSQL. Zapisywanie w formacie delty gwarantuje, że można później włączyć przechwytywanie zmian danych (CDC) na potrzeby synchronizacji przyrostowej.

from pyspark.sql import SparkSession

# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
    ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
    ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])

# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
  ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
  ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore") 

// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")

Ładowanie początkowych danych wsadowych do usługi Azure Cosmos DB dla NoSQL

Następnie przeczytaj tabelę recommendations_delta Delta w Spark DataFrame i wykonaj początkowy zapis wsadowy do usługi Azure Cosmos DB for NoSQL przy użyciu formatu cosmos.oltp. Użyj trybu dołączania , aby dodać dane bez zastępowania istniejącej zawartości w docelowej bazie danych i kontenerze. Ten krok gwarantuje, że wszystkie dane historyczne są dostępne na koncie przed rozpoczęciem usługi CDC.

# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")

# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")

// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()

Włącz synchronizację przesyłania strumieniowego z kanałem zmiany danych

Włącz funkcję dziennika zmian danych (CDF) usługi Delta Lake w tabeli recommendations_delta, zmieniając jej właściwości. Usługa CDF pozwala Delta Lake śledzić wszystkie przyszłe operacje wstawiania, aktualizowania i usuwania na poziomie wiersza. Włączenie tej właściwości jest niezbędne do przeprowadzania synchronizacji przyrostowych z usługą Azure Cosmos DB for NoSQL, ponieważ uwidacznia zmiany bez konieczności porównywania migawek.

Po załadowaniu danych historycznych zmiany w tabeli delty można przechwycić przy użyciu zestawienia zmian różnicowych (CDF). Można zaimplementować CDC opartą na partiach lub przesyłaniu strumieniowym.

# Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()

Weryfikowanie danych przy użyciu zapytań NoSQL

Po zapisywaniu do Azure Cosmos DB for NoSQL zweryfikuj dane poprzez ponowne wysłanie zapytania do platformy Spark, używając tej samej konfiguracji konta. Następnie sprawdź przetworzone dane, uruchom walidacje lub połącz z innymi zestawami danych w Delta Lake na potrzeby analizy lub raportowania. Usługa Azure Cosmos DB for NoSQL obsługuje szybkie, indeksowane odczyty w celu uzyskania wydajności zapytań w czasie rzeczywistym.

# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()

# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()

// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()