Bagikan melalui


Tutorial: Mengekstrak balik, mengubah, & memuat (ETL) dari Delta Lake ke Azure Cosmos DB untuk NoSQL dengan konektor Spark OLTP

Dalam tutorial ini, Anda menyiapkan alur ETL terbalik untuk memindahkan data yang diperkaya dari tabel Delta di Azure Databricks ke Azure Cosmos DB untuk NoSQL. Anda kemudian menggunakan konektor Spark Pemrosesan Transaksi Online (OLTP) untuk Azure Cosmos DB untuk NoSQL guna menyinkronkan data.

Prasyarat untuk penyiapan alur Reverse ETL

  • Akun Azure Cosmos DB yang sudah ada.
    • Jika Anda memiliki langganan Azure, buat akun baru.
  • Ruang kerja Azure Databricks yang sudah ada.
  • Versi terbaru Azure CLI.

Mengonfigurasi kontrol akses berbasis peran dengan Microsoft Entra

Identitas terkelola Azure memastikan autentikasi tanpa kata sandi yang aman ke Azure Cosmos DB for NoSQL tanpa mengelola kredensial secara manual. Dalam langkah prasyarat ini, siapkan identitas terkelola yang ditetapkan pengguna, yang secara otomatis dibuat oleh Azure Databricks, dengan akses baca ke metadata dan akses tulis ke data untuk akun Azure Cosmos DB for NoSQL Anda. Langkah ini mengonfigurasi peran kontrol dan kontrol akses berbasis peran bidang data untuk identitas terkelola.

  1. Masuk ke portal Azure (https://portal.azure.com).

  2. Navigasikan ke sumber daya Azure Databricks yang ada.

  3. Di panel Esensial , temukan dan navigasikan ke grup sumber daya terkelola yang terkait dengan ruang kerja.

  4. Di grup sumber daya terkelola, pilih identitas terkelola yang ditetapkan pengguna yang dibuat secara otomatis dengan ruang kerja.

  5. Catat nilai bidang ID Klien dan ID Objek (utama) di panel Esensial . Anda menggunakan nilai ini nanti untuk menetapkan peran kontrol dan bidang data.

    Petunjuk / Saran

    Atau, Anda bisa mendapatkan ID utama identitas terkelola menggunakan Azure CLI. Dengan asumsi bahwa nama identitas terkelola adalah dbmanagedidentity, gunakan az resource show perintah untuk mendapatkan ID utama.

    az resource show \
        --resource-group "<name-of-managed-resource-group>" \
        --name "dbmanagedidentity" \
        --resource-type "Microsoft.ManagedIdentity/userAssignedIdentities" \
        --query "{clientId: properties.clientId, principalId: properties.principalId}"
    
  6. Navigasikan ke target akun Azure Cosmos DB for NoSQL.

  7. Pada halaman akun, pilih Kontrol akses (IAM).

  8. Di panel Kontrol akses, pilih Tambahkan dan kemudian opsi Tambahkan penetapan peran untuk memulai proses penetapan peran lapisan kontrol ke identitas terkelola yang ditetapkan pengguna.

  9. Pilih peran Pembaca Akun Cosmos DB dalam daftar peran untuk penugasan.

  10. Di bagian untuk menetapkan akses ke pengguna, grup, atau perwakilan layanan gunakan opsi pilih anggota .

  11. Dalam dialog anggota, masukkan ID prinsipal untuk memfilter ke identitas terkelola yang ditetapkan pengguna yang terkait dengan Azure Databricks. Pilih identitas tersebut.

  12. Terakhir, pilih Tinjau + Tetapkan untuk membuat penetapan peran sarana kontrol.

  13. Gunakan perintah az cosmosdb sql role assignment create untuk menetapkan peran bidang data Cosmos DB Built-in Data Contributor dan cakupan / ke identitas terkelola yang ditetapkan pengguna yang terkait dengan Azure Databricks.

    az cosmosdb sql role assignment create \
        --resource-group "<name-of-resource-group>" \
        --account-name "<name-of-cosmos-nosql-account>" \
        --principal-id "<managed-identity-principal-id>" \
        --role-definition-name "Cosmos DB Built-in Data Contributor" \ --scope "/"
    
  14. Gunakan az account show untuk mendapatkan pengidentifikasi langganan dan penyewa Anda. Nilai-nilai ini diperlukan di langkah selanjutnya dengan konektor Spark menggunakan autentikasi Microsoft Entra.

    az account show --query '{subscriptionId: id, tenantId: tenantId}'
    

Membuat buku catatan Databricks

  1. Navigasikan ke sumber daya Azure Databricks yang ada lalu buka UI ruang kerja.

  2. Jika Anda belum memiliki kluster, buat kluster baru.

    Penting

    Pastikan kluster memiliki Runtime versi 15.4 yang lebih tinggi yang memiliki dukungan jangka panjang untuk Spark 3.5.0 dan Scala 2.12. Langkah-langkah yang tersisa dalam panduan ini mengasumsikan versi alat ini.

  3. Arahkan ke Pustaka>Pasang Baru> dan Maven untuk memasang paket Maven.

  4. Mencari konektor Spark untuk Azure Cosmos DB untuk NoSQL dengan menggunakan filter ID Grupcom.azure.cosmos.spark dan memilih paket dengan ID Artefak dari azure-cosmos-spark_3-5_2-12.

  5. Buat buku catatan baru dengan menavigasi ke Ruang> Kerja[Folder]>Buku Catatan>.

  6. Lampirkan buku catatan ke kluster Anda.

Mengonfigurasi konektor Spark di Azure Databricks

Konfigurasikan konektor Spark untuk menyambungkan ke kontainer akun Anda menggunakan autentikasi Microsoft Entra. Selain itu, konfigurasikan konektor untuk hanya menggunakan ambang batas throughput terbatas untuk operasi Spark. Untuk mengonfigurasi konektor spark, tentukan kamus konfigurasi dengan kredensial untuk menyambungkan ke akun Anda. Kredensial ini meliputi:

Nilai
spark.cosmos.accountEndpoint Titik akhir akun NoSQL
spark.cosmos.database Nama database target
spark.cosmos.container Nama kontainer target
spark.cosmos.auth.type ManagedIdentity
spark.cosmos.auth.aad.clientId ID Klien dari identitas terkelola yang ditetapkan pengguna
spark.cosmos.account.subscriptionId ID langganan
spark.cosmos.account.tenantId ID penyewa Microsoft Entra terkait
spark.cosmos.account.resourceGroupName Nama grup sumber daya
spark.cosmos.throughputControl.enabled true
spark.cosmos.throughputControl.name TargetContainerThroughputControl
spark.cosmos.throughputControl.targetThroughputThreshold 0.30
spark.cosmos.throughputControl.globalControl.useDedicatedContainer 'false
cosmos_config = {
    # General settings
    "spark.cosmos.accountEndpoint": "<endpoint>",
    "spark.cosmos.database": "products",
    "spark.cosmos.container": "recommendations",
    # Entra authentication settings
    "spark.cosmos.auth.type": "ManagedIdentity",
    "spark.cosmos.account.subscriptionId": "<subscriptionId>",
    "spark.cosmos.account.tenantId": "<tenantId>",
    "spark.cosmos.account.resourceGroupName": "<resourceGroupName>",
    # Throughput control settings
    "spark.cosmos.throughputControl.enabled": "true",
    "spark.cosmos.throughputControl.name": "TargetContainerThroughputControl",
    "spark.cosmos.throughputControl.targetThroughputThreshold": "0.30",
    "spark.cosmos.throughputControl.globalControl.useDedicatedContainer": "false",
}
val cosmosconfig = Map(
  // General settings
  "spark.cosmos.accountEndpoint" -> "<endpoint>",
  "spark.cosmos.database" -> "products",
  "spark.cosmos.container" -> "recommendations",
  // Entra authentication settings
  "spark.cosmos.auth.type" -> "ManagedIdentity",
  "spark.cosmos.account.subscriptionId" -> "<subscriptionId>",
  "spark.cosmos.account.tenantId" -> "<tenantId>",
  "spark.cosmos.account.resourceGroupName" -> "<resourceGroupName>",
  // Throughput control settings
  "spark.cosmos.throughputControl.enabled" -> "true",
  "spark.cosmos.throughputControl.name" -> "TargetContainerThroughputControl",
  "spark.cosmos.throughputControl.targetThroughputThreshold" -> "0.30",
  "spark.cosmos.throughputControl.globalControl.useDedicatedContainer" -> "false"
)

Nota

Dalam sampel ini, database target diberi nama products dan kontainer target diberi nama recommendations.

Konfigurasi throughput, seperti yang ditentukan dalam langkah ini, memastikan bahwa hanya 30% unit permintaan (RU) yang dialokasikan untuk kontainer target yang tersedia untuk operasi Spark.

Memasukkan data rekomendasi produk sampel ke Tabel Delta

Buat contoh DataFrame dengan informasi rekomendasi produk untuk pengguna dan tulis ke dalam tabel Delta bernama recommendations_delta. Langkah ini mensimulasikan data yang dikumpulkan dan diubah di data lake yang ingin Anda sinkronkan ke Azure Cosmos DB for NoSQL. Menulis ke format Delta memastikan Anda nantinya dapat mengaktifkan change data capture (CDC) untuk sinkronisasi inkremental.

from pyspark.sql import SparkSession

# Create sample data and convert it to a DataFrame
df = spark.createDataFrame([
    ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 80),
    ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 90)
], ["id", "productname", "category", "recommendationscore"])

# Write the DataFrame to a Delta table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")
// Create sample data as a sequence and convert it to a DataFrame
val df = Seq(
  ("yara-lima", "Full-Finger Gloves", "clothing-gloves", 12.95),
  ("elza-pereira", "Long-Sleeve Logo Jersey", "clothing-jerseys", 19.99)
).toDF("id", "productname", "category", "recommendationscore") 

// Write the DataFrame to a table
df.write.mode("append").format("delta").saveAsTable("recommendations_delta")

Batch memuat data awal ke Azure Cosmos DB untuk NoSQL

Selanjutnya, baca recommendations_delta tabel Delta ke dalam Spark DataFrame dan lakukan penulisan batch awal ke Azure Cosmos DB untuk NoSQL menggunakan cosmos.oltp format . Gunakan mode penambahan untuk menambahkan data tanpa menimpa konten yang ada dalam database dan kontainer target. Langkah ini memastikan bahwa semua data historis tersedia di akun sebelum CDC dimulai.

# Read the Delta table into a DataFrame
df_delta = spark.read.format("delta").table("recommendations_delta")

# Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Read the Delta table into a DataFrame
val df_delta = spark.read.format("delta").table("recommendations_delta")

// Write the DataFrame to the container using the Cosmos OLTP format
df_delta.write.format("cosmos.oltp").mode("append").options(cosmosconfig).save()

Mengaktifkan sinkronisasi streaming dengan mengubah umpan data

Aktifkan fitur Umpan Data Perubahan (CDF) Delta Lake pada recommendations_delta tabel dengan mengubah properti tabel. CDF memungkinkan Delta Lake menelusuri semua penyisipan, pembaruan, dan penghapusan tingkat baris di masa depan. Mengaktifkan properti ini sangat penting untuk melakukan sinkronisasi inkremental ke Azure Cosmos DB untuk NoSQL, karena mengekspos perubahan tanpa perlu membandingkan rekam jepret.

Setelah pemuatan data historis, perubahan dalam tabel Delta dapat diambil menggunakan Delta Change Data Feed (CDF). Anda dapat menerapkan CDC berbasis batch atau berbasis streaming.

# Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

# Read the Change Data Capture (CDC) data from the Delta table
cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

# Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(**cosmos_config).save()
// Enable Change Data Feed (CDF)
spark.sql("""
  ALTER TABLE recommendations_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

// Read the Change Data Capture (CDC) data from the Delta table
val cdc_batch_df = spark.read.format("delta").option("readChangeData", "true").option("startingVersion", "1").table("recommendations_delta")

// Write the captured changes to Azure Cosmos DB for NoSQL in append mode
cdc_batch_df.write.format("cosmos.oltp").mode("append").options(cosmos_config).save()

Memverifikasi data menggunakan kueri NoSQL

Setelah menulis ke Azure Cosmos DB for NoSQL, verifikasi data dengan mengkuerinya kembali ke Spark menggunakan konfigurasi akun yang sama. Kemudian; periksa data yang diserap, jalankan validasi, atau bergabung dengan himpunan data lain di Delta Lake untuk analitik atau pelaporan. Azure Cosmos DB for NoSQL mendukung pembacaan yang cepat dan terindeks untuk performa kueri real time.

# Load DataFrame
df_cosmos = spark.read.format("cosmos.oltp").options(**cosmos_config).load()

# Run query
df_cosmos.select("id", "productname", "category", "recommendationscore").show()
// Load DataFrame
val dfCosmos = spark.read.format("cosmos.oltp").options(cosmosConfig).load()

// Run query
dfCosmos.select("id", "productname", "category", "recommendationscore").show()