Berinteraksi dengan Azure Cosmos DB menggunakan Apache Spark 3 di Azure Synapse Link

Dalam artikel ini, Anda akan mempelajari cara berinteraksi dengan Azure Cosmos DB menggunakan Synapse Apache Spark 3. Dengan dukungan penuh untuk Scala, Python, SparkSQL, dan C#, Synapse Apache Spark 3 berperan sentral untuk analitik, rekayasa data, sains data, dan skenario eksplorasi data di Azure Synapse Link untuk Azure Cosmos DB.

Kemampuan berikut didukung saat berinteraksi dengan Azure Cosmos DB:

  • Synapse Apache Spark 3 memungkinkan Anda menganalisis data dalam kontainer Azure Cosmos DB yang diaktifkan Azure Synapse Link dalam waktu hampir real-time tanpa memengaruhi performa beban kerja transaksional Anda. Dua opsi berikut ini tersedia untuk kueri penyimpanan analitik Azure Cosmos DB dari Spark:
    • Muatkan ke Spark DataFrame
    • Buat tabel Spark
  • Synapse Apache Spark juga memungkinkan Anda untuk menyerap data ke Azure Cosmos DB. Penting untuk dicatat bahwa data selalu diserap ke dalam kontainer Azure Cosmos DB melalui penyimpanan transaksional. Saat Synapse Link diaktifkan, setiap sisipan, pembaruan, dan penghapusan baru secara otomatis disinkronkan ke penyimpanan analitis.
  • Synapse Apache Spark juga mendukung streaming terstruktur Spark dengan Azure Cosmos DB sebagai sumber serta sink.

Bagian berikut memandu Anda melalui sintaks kemampuan di atas. Anda juga dapat memeriksa modul Learn tentang cara Mengkueri Azure Cosmos DB dengan Apache Spark untuk Azure Synapse Analytics. Gerakan di ruang kerja Azure Synapse Analytics dirancang untuk memberikan pengalaman out-of-the-box yang mudah dimulai. Gerakan terlihat saat Anda mengeklik kanan pada kontainer Azure Cosmos DB di tab Data ruang kerja Synapse. Dengan gerakan, Anda dapat dengan cepat menghasilkan kode dan menyesuaikannya dengan kebutuhan Anda. Gerakan juga cocok untuk menemukan data dengan satu klik.

Penting

Anda harus mengetahui beberapa kendala dalam skema analitik yang dapat menyebabkan perilaku tak terduga dalam operasi pemuatan data. Sebagai contoh, hanya 1000 properti pertama dari skema transaksional yang tersedia dalam skema analitis, properti dengan ruang tidak tersedia, dll. Jika Anda mengalami beberapa hasil yang tidak terduga, periksa batasan skema penyimpanan analitis untuk detail lebih lanjut.

Kueri penyimpanan analitik Azure Cosmos DB

Sebelum Anda mempelajari tentang dua opsi yang mungkin untuk mengueri penyimpanan analitik Azure Cosmos DB, memuat ke Spark DataFrame dan membuat tabel Spark, ada baiknya menjelajahi perbedaan pengalaman sehingga Anda dapat memilih opsi yang sesuai untuk kebutuhan Anda.

Perbedaan pengalaman ada di sekitar apakah perubahan data yang mendasari kontainer Azure Cosmos DB harus secara otomatis tecermin dalam analisis yang dilakukan di Spark. Saat Spark DataFrame terdaftar atau tabel Spark dibuat terhadap penyimpanan analitik kontainer, metadata di sekitar salinan bayangan data saat ini di penyimpanan analitik diambil ke Spark untuk pushdown yang efisien dari analisis berikutnya. Penting untuk dicatat bahwa karena Spark mengikuti kebijakan evaluasi malas, kecuali tindakan dipanggil pada Spark DataFrame atau kueri SparkSQL dijalankan terhadap tabel Spark, data aktual tidak diambil dari penyimpanan analitik kontainer yang mendasarinya.

Dalam hal pemuatan ke Spark DataFrame, metadata yang diambil di-cache selama masa pakai sesi Spark dan maka tindakan berikutnya yang dipanggil pada DataFrame dievaluasi terhadap salinan bayangan penyimpanan analitik pada saat pembuatan DataFrame.

Di sisi lain, dalam kasus membuat tabel Spark, metadata status penyimpanan analitik tidak di-cache di Spark dan dimuat ulang pada setiap eksekusi kueri Spark SQL terhadap tabel Spark.

Oleh karena itu, Anda dapat memilih antara memuat ke Spark DataFrame dan membuat tabel Spark, berdasarkan apakah Anda ingin analisis Spark dievaluasi terhadap salinan bayangan tetap penyimpanan analitik atau terhadap salinan bayangan terbaru dari penyimpanan analitik.

Catatan

Untuk mengkueri akun Azure Cosmos DB untuk MongoDB, pelajari selengkapnya tentang representasi skema keakuratan penuh di penyimpanan analitik dan nama properti yang diperluas yang akan digunakan.

Catatan

Perhatikan bahwa semua options dalam perintah di bawah ini bersifat peka huruf besar/kecil.

Muatkan ke Spark DataFrame

Dalam contoh ini, Anda akan membuat Spark DataFrame yang menunjuk ke penyimpanan analitik Azure Cosmos DB. Anda kemudian dapat melakukan analisis tambahan dengan memanggil tindakan Spark terhadap DataFrame. Operasi ini tidak berdampak pada penyimpanan transaksional.

Sintaks dalam Python adalah sebagai berikut:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

Sintaks yang setara di Scala adalah sebagai berikut:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

Buat tabel Spark

Dalam contoh ini, Anda akan membuat tabel Spark yang menunjuk penyimpanan analitik Azure Cosmos DB. Anda kemudian dapat melakukan analisis tambahan dengan memanggil kueri SparkSQL terhadap tabel. Operasi ini tidak berdampak pada penyimpanan transaksional atau tidak menimbulkan pergerakan data apa pun. Jika Anda memutuskan untuk menghapus tabel Spark ini, kontainer Azure Cosmos DB yang mendasarinya dan penyimpanan analitik yang sesuai tidak akan terpengaruh.

Skenario ini mudah untuk menggunakan tabel Spark kembali melalui alat pihak ketiga dan menyediakan aksesibilitas ke data yang mendasarinya untuk run-time.

Sintaks untuk membuat tabel Spark adalah sebagai berikut:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

Catatan

Jika Anda memiliki skenario saat skema kontainer Azure Cosmos DB yang mendasarinya berubah seiring waktu; dan jika Anda ingin skema yang diperbarui secara otomatis tercermin dalam kueri terhadap tabel Spark, Anda dapat mencapainya dengan mengatur opsi spark.cosmos.autoSchemaMerge ke true dalam opsi tabel Spark.

Tulis Spark DataFrame ke kontainer Azure Cosmos DB

Dalam contoh ini, Anda akan menulis Spark DataFrame ke dalam kontainer Azure Cosmos DB. Operasi ini akan berdampak pada performa beban kerja transaksional dan mengonsumsi unit permintaan yang disediakan pada kontainer Azure Cosmos DB atau database bersama.

Sintaks dalam Python adalah sebagai berikut:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .mode('append')\
    .save()

Sintaks yang setara di Scala adalah sebagai berikut:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    mode(SaveMode.Append).
    save()

Muat streaming DataFrame dari kontainer

Dalam gerakan ini, Anda akan menggunakan kemampuan Spark Streaming untuk memuat data dari kontainer ke dalam dataframe. Data akan disimpan di akun data lake utama (dan sistem file) yang Anda sambungkan ke ruang kerja.

Catatan

Jika Anda mencari referensi pustaka eksternal di Synapse Apache Spark, pelajari selengkapnya di sini. Misalnya, jika Anda ingin menyerap Spark DataFrame ke kontainer Azure Cosmos DB untuk MongoDB, Anda dapat memanfaatkan konektor MongoDB untuk Spark di sini.

Muat DataFrame streaming dari kontainer Azure Cosmos DB

Dalam contoh ini, Anda akan menggunakan kemampuan streaming terstruktur Spark untuk memuat data dari kontainer Azure Cosmos DB ke dalam Spark streaming DataFrame menggunakan fungsi umpan perubahan di Azure Cosmos DB. Data titik pemeriksaan yang digunakan oleh Spark akan disimpan di akun data lake utama (dan sistem file) yang Anda sambungkan ke ruang kerja.

Sintaks dalam Python adalah sebagai berikut:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp.changeFeed")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.startFrom", "Beginning")\
    .option("spark.cosmos.changeFeed.mode", "Incremental")\
    .load()

Sintaks yang setara di Scala adalah sebagai berikut:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp.changeFeed").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.startFrom", "Beginning").
    option("spark.cosmos.changeFeed.mode", "Incremental").
    load()

Menulis DataFrame streaming ke kontainer Azure Cosmos DB

Dalam contoh ini, Anda akan menulis DataFrame streaming ke dalam kontainer Azure Cosmos DB. Operasi ini akan berdampak pada kinerja beban kerja transaksional dan mengonsumsi Unit Permintaan yang disediakan pada kontainer Azure Cosmos DB atau database bersama. Jika folder /localWriteCheckpointFolder tidak dibuat (dalam contoh di bawah), itu akan dibuat secara otomatis.

Sintaks dalam Python adalah sebagai berikut:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

streamQuery = dfStream\
    .writeStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("checkpointLocation", "/tmp/myRunId/")\
    .outputMode("append")\
    .start()

streamQuery.awaitTermination()

Sintaks yang setara di Scala adalah sebagai berikut:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val query = dfStream.
            writeStream.
            format("cosmos.oltp").
            outputMode("append").
            option("spark.synapse.linkedService", "<enter linked service name>").
            option("spark.cosmos.container", "<enter container name>").
            option("checkpointLocation", "/tmp/myRunId/").
            start()

query.awaitTermination()

Langkah berikutnya