Mengkueri data Cosmos DB dengan Spark
Setelah menambahkan layanan tertaut untuk database Azure Cosmos DB yang diaktifkan penyimpanan analitik, Anda dapat menggunakannya untuk mengkueri data menggunakan kumpulan Spark di ruang kerja Azure Synapse Analytics Anda.
Memuat data analitik Azure Cosmos DB ke dalam dataframe
Untuk eksplorasi awal atau analisis cepat data dari layanan tertaut Azure Cosmos DB, seringkali paling mudah untuk memuat data dari kontainer ke dalam dataframe menggunakan bahasa yang didukung Spark seperti PySpark (implementasi khusus Spark dari Python) atau Scala (bahasa berbasis Java yang sering digunakan pada Spark).
Misalnya, kode PySpark berikut dapat digunakan untuk memuat dataframe bernama df dari data di kontainer my-container yang terhubung menggunakan layanan tertaut my_linked_service, dan menampilkan 10 baris pertama data:
df = spark.read
.format("cosmos.olap")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.load()
display(df.limit(10))
Misalkan kontainer my-container digunakan untuk menyimpan item yang mirip dengan contoh berikut:
{
"productID": 123,
"productName": "Widget",
"id": "7248f072-11c3-42b1-a368-...",
"_rid": "mjMaAL...==",
"_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
"_etag": "\"54004b09-0000-2300-...\"",
"_attachments": "attachments/",
"_ts": 1655414791
}
Output dari kode PySpark akan mirip dengan tabel berikut:
_rid | _ts | productID | productName | id | _etag |
---|---|---|---|---|---|
mjMaAL...== | 1655414791 | 123 | Widget | 7248f072-11c3-42b1-a368-... | 54004b09-0000-2300-... |
mjMaAL...== | 1655414829 | 124 | Wotsit | dc33131c-65c7-421a-a0f7-... | 5400ca09-0000-2300-... |
mjMaAL...== | 1655414835 | 125 | Thingumy | ce22351d-78c7-428a-a1h5-... | 5400ca09-0000-2300-... |
... | ... | ... | ... | ... | ... |
Data dimuat dari penyimpanan analitis dalam kontainer, bukan dari penyimpanan operasional; memastikan bahwa tidak ada overhead kueri di penyimpanan operasional. Bidang di penyimpanan data analitis mencakup bidang yang ditentukan aplikasi (dalam hal ini productID dan productName) dan secara otomatis membuat bidang metadata.
Setelah memuat dataframe, Anda dapat menggunakan metode aslinya untuk menjelajahi data. Misalnya, kode berikut membuat dataframe baru yang hanya berisi kolom productID dan productName, yang diurutkan berdasarkan productName:
products_df = df.select("productID", "productName").orderBy("productName")
display(products_df.limit(10))
Output kode ini akan terlihat mirip dengan tabel ini:
productID | productName |
---|---|
125 | Thingumy |
123 | Widget |
124 | Wotsit |
... | ... |
Menulis dataframe ke kontainer Cosmos DB
Dalam sebagian besar skenario HTAP, Anda harus menggunakan layanan tertaut untuk membaca data ke Spark dari penyimpanan analitis. Namun, Anda dapat menulis konten dataframe ke kontainer seperti yang ditunjukkan dalam contoh berikut:
mydf.write.format("cosmos.oltp")\
.option("spark.synapse.linkedService", "my_linked_service")\
.option("spark.cosmos.container", "my-container")\
.mode('append')\
.save()
Catatan
Menulis dataframe ke kontainer memperbarui penyimpanan operasional dan dapat berdampak pada performanya. Perubahan kemudian disinkronkan ke penyimpanan analitis.
Menggunakan Spark SQL untuk mengkueri data analitik Azure Cosmos DB
Spark SQL adalah API Spark yang menyediakan sintaks bahasa pemrogram SQL dan semantik database relasional dalam kumpulan Spark. Anda dapat menggunakan Spark SQL untuk menentukan metadata untuk tabel yang dapat dikueri menggunakan SQL.
Misalnya, kode berikut membuat tabel bernama Produk berdasarkan kontainer hipotetis yang digunakan dalam contoh sebelumnya:
%%sql
-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;
USE mydb;
-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
spark.synapse.linkedService 'my_linked_service',
spark.cosmos.container 'my-container'
);
-- Query the table
SELECT productID, productName
FROM products;
Tip
Kata kunci %%sql
di awal kode adalah kata ajaib yang menginstruksikan kumpulan Spark untuk menjalankan kode sebagai SQL alih-alih bahasa default (yang biasanya diatur ke PySpark).
Dengan menggunakan pendekatan ini, Anda dapat membuat database logis di kumpulan Spark yang kemudian dapat Anda gunakan untuk mengkueri data analitik di Azure Cosmos DB untuk mendukung analisis data dan melaporkan beban kerja tanpa memengaruhi penyimpanan operasional di akun Azure Cosmos DB Anda.