الاستعلام عن بيانات Cosmos DB باستخدام Spark

مكتمل

بعد إضافة خدمة مرتبطة لمخزنك التحليلي الممكن لقاعدة بيانات Azure Cosmos DB، يمكنك استخدامها للاستعلام عن البيانات باستخدام تجمع Spark في مساحة عمل Azure Synapse Analytics.

تحميل البيانات التحليلية ل Azure Cosmos DB في إطار بيانات

للاستكشاف الأولي أو التحليل السريع للبيانات من خدمة مرتبطة ب Azure Cosmos DB، غالبا ما يكون من الأسهل تحميل البيانات من حاوية إلى إطار بيانات باستخدام لغة مدعومة من Spark مثل PySpark (تنفيذ خاص ب Spark ل Python) أو Scala (لغة تستند إلى Java غالبا ما تستخدم على Spark).

على سبيل المثال، يمكن استخدام التعليمات البرمجية PySpark التالية لتحميل إطار بيانات يسمى df من البيانات الموجودة في حاوية my-container المتصلة لاستخدام الخدمة المرتبطة my_linked_service، وعرض أول 10 صفوف من البيانات:

 df = spark.read
     .format("cosmos.olap")\
     .option("spark.synapse.linkedService", "my_linked_service")\
     .option("spark.cosmos.container", "my-container")\
     .load()

display(df.limit(10))

لنفترض أن حاوية my-container تستخدم لتخزين عناصر مشابهة للمثال التالي:

{
    "productID": 123,
    "productName": "Widget",
    "id": "7248f072-11c3-42b1-a368-...",
    "_rid": "mjMaAL...==",
    "_self": "dbs/mjM...==/colls/mjMaAL...=/docs/mjMaAL...==/",
    "_etag": "\"54004b09-0000-2300-...\"",
    "_attachments": "attachments/",
    "_ts": 1655414791
}

يجب أن يكون الإخراج من التعليمات البرمجية PySpark مشابهاً للجدول التالي:

_rid TS productID productName المعرف _etag
mjMaAL...== 1655414791 123 عنصر واجهة المستخدم 7248f072-11c3-42b1-a368-... 54004b09-0000-2300-...
mjMaAL...== 1655414829 124 Wotsit dc33131c-65c7-421a-a0f7-... 5400ca09-0000-2300-...
mjMaAL...== 1655414835 125 Thingumy ce22351d-78c7-428a-a1h5-... 5400ca09-0000-2300-...
... ... ... ... ... ...

يتم تحميل البيانات من المخزن التحليلي في الحاوية، وليس من المخزن التشغيلي؛ لضمان عدم وجود حمل استعلام على المخزن التشغيلي. تتضمن الحقول في مخزن البيانات التحليلية الحقول المعرفة من قبل التطبيق (في هذه الحالة productID و productName) وحقول بيانات التعريف التي تم إنشاؤها تلقائياً.

بعد تحميل إطار البيانات، يمكنك استخدام أساليبه الأصلية لاستكشاف البيانات. على سبيل المثال، تنشئ التعليمات البرمجية التالية إطار بيانات جديدا يحتوي فقط على أعمدة productID و productName، مرتبة بواسطة productName:

products_df = df.select("productID", "productName").orderBy("productName")

display(products_df.limit(10))

سيبدو إخراج هذه التعليمة البرمجية مشابهاً لهذا الجدول:

productID productName
125 Thingumy
123 عنصر واجهة المستخدم
124 Wotsit
... ...

كتابة إطار بيانات إلى حاوية Cosmos DB

في معظم سيناريوهات HTAP، يجب عليك استخدام الخدمة المرتبطة لقراءة البيانات في Spark من المخزن التحليلي. ومع ذلك ، يمكنك كتابة محتويات إطار بيانات إلى الحاوية كما هو موضح في المثال التالي:

mydf.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "my_linked_service")\
    .option("spark.cosmos.container", "my-container")\
    .mode('append')\
    .save()

إشعار

تؤدي كتابة إطار بيانات إلى حاوية إلى تحديث المخزن التشغيلي ويمكن أن يكون لذلك تأثير على أدائه. ثم تتم مزامنة التغييرات إلى المخزن التحليلي.

استخدام Spark SQL للاستعلام عن البيانات التحليلية ل Azure Cosmos DB

Spark SQL هو Spark API يوفر بناء جملة لغة SQL ودلالات قاعدة البيانات الارتباطية في تجمع Spark. يمكنك استخدام Spark SQL لتعريف بيانات التعريف للجداول التي يمكن الاستعلام عنها باستخدام SQL.

على سبيل المثال، تنشئ التعليمات البرمجية التالية جدولاً باسم Products استناداً إلى الحاوية الافتراضية المستخدمة في الأمثلة السابقة:

%%sql

-- Create a logical database in the Spark metastore
CREATE DATABASE mydb;

USE mydb;

-- Create a table from the Cosmos DB container
CREATE TABLE products using cosmos.olap options (
    spark.synapse.linkedService 'my_linked_service',
    spark.cosmos.container 'my-container'
);

-- Query the table
SELECT productID, productName
FROM products;

تلميح

الكلمة الأساسية %%sql في بداية التعليمات البرمجية هي magic التي توجه تجمع Spark لتشغيل التعليمات البرمجية كـ SQL بدلاً من اللغة الافتراضية (التي يتم تعيينها عادة إلى PySpark).

باستخدام هذا الأسلوب، يمكنك إنشاء قاعدة بيانات منطقية في تجمع Spark الخاص بك يمكنك استخدامها بعد ذلك للاستعلام عن البيانات التحليلية في Azure Cosmos DB لدعم تحليل البيانات والإبلاغ عن أحمال العمل دون التأثير على المخزن التشغيلي في حساب Azure Cosmos DB الخاص بك.