Mengkueri data Cosmos DB dengan Spark

Selesai

Setelah menambahkan layanan tertaut untuk database Azure Cosmos DB yang diaktifkan penyimpanan analitik, Anda dapat menggunakannya untuk mengkueri data menggunakan kumpulan Spark di ruang kerja Azure Synapse Analytics Anda.

Memuat data analitik Azure Cosmos DB ke dalam dataframe

Untuk eksplorasi awal atau analisis cepat data dari layanan tertaut Azure Cosmos DB, seringkali paling mudah untuk memuat data dari kontainer ke dalam dataframe menggunakan bahasa yang didukung Spark seperti PySpark (implementasi khusus Spark dari Python) atau Scala (bahasa berbasis Java yang sering digunakan pada Spark).

Misalnya, kode PySpark berikut dapat digunakan untuk memuat dataframe bernama df dari data di kontainer my-container yang terhubung menggunakan layanan tertaut my_linked_service, dan menampilkan 10 baris pertama data:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

Misalkan kontainer my-container digunakan untuk menyimpan item yang mirip dengan contoh berikut:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

Output dari kode PySpark akan mirip dengan tabel berikut:

_rid _ts productID productName id _etag
mjMaAL...== 1655414791 123 Widget 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

Data dimuat dari penyimpanan analitis dalam kontainer, bukan dari penyimpanan operasional; memastikan bahwa tidak ada overhead kueri di penyimpanan operasional. Bidang di penyimpanan data analitis mencakup bidang yang ditentukan aplikasi (dalam hal ini productID dan productName) dan secara otomatis membuat bidang metadata.

Setelah memuat dataframe, Anda dapat menggunakan metode aslinya untuk menjelajahi data. Misalnya, kode berikut membuat dataframe baru yang hanya berisi kolom productID dan productName, yang diurutkan berdasarkan productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

Output kode ini akan terlihat mirip dengan tabel ini:

productID productName
125 Thingumy
123 Widget
124 Wotsit
... ...

Menulis dataframe ke kontainer Cosmos DB

Dalam sebagian besar skenario HTAP, Anda harus menggunakan layanan tertaut untuk membaca data ke Spark dari penyimpanan analitis. Namun, Anda dapat menulis konten dataframe ke kontainer seperti yang ditunjukkan dalam contoh berikut:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

Catatan

Menulis dataframe ke kontainer memperbarui penyimpanan operasional dan dapat berdampak pada performanya. Perubahan kemudian disinkronkan ke penyimpanan analitis.

Menggunakan Spark SQL untuk mengkueri data analitik Azure Cosmos DB

Spark SQL adalah API Spark yang menyediakan sintaks bahasa pemrogram SQL dan semantik database relasional dalam kumpulan Spark. Anda dapat menggunakan Spark SQL untuk menentukan metadata untuk tabel yang dapat dikueri menggunakan SQL.

Misalnya, kode berikut membuat tabel bernama Produk berdasarkan kontainer hipotetis yang digunakan dalam contoh sebelumnya:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

Tip

Kata kunci %%sql di awal kode adalah kata ajaib yang menginstruksikan kumpulan Spark untuk menjalankan kode sebagai SQL alih-alih bahasa default (yang biasanya diatur ke PySpark).

Dengan menggunakan pendekatan ini, Anda dapat membuat database logis di kumpulan Spark yang kemudian dapat Anda gunakan untuk mengkueri data analitik di Azure Cosmos DB untuk mendukung analisis data dan melaporkan beban kerja tanpa memengaruhi penyimpanan operasional di akun Azure Cosmos DB Anda.