تفاعل مع Azure Cosmos DB باستخدام Apache Spark 2 في Azure Synapse Link

ملاحظة

بالنسبة إلى Azure Synapse Link ل Azure Cosmos DB باستخدام Spark 3، راجع هذه المقالة Azure Synapse Link ل Azure Cosmos DB على Spark 3

في هذه المقالة، ستتعلم كيفية التفاعل مع Azure Cosmos DB باستخدام Synapse Apache Spark 2. من خلال دعمه الكامل لـ Scala وPython وSparkSQL و#C، يعد Synapse Apache Spark مركزيًا في سيناريوهات التحليلات وهندسة البيانات وعلوم البيانات واستكشاف البيانات في Azure Synapse Link لـ Azure Cosmos DB.

يتم دعم الإمكانات التالية في أثناء التفاعل مع Azure Cosmos DB:

  • يسمح لك Synapse Apache Spark بتحليل البيانات في حاويات Azure Cosmos DB التي تم تمكينها باستخدام Azure Synapse Link في الوقت الفعلي تقريبًا دون التأثير على أداء أعباء العمل الخاصة بالمعاملات. يتوفر الخياران التاليان للاستعلام عن متجر Azure Cosmos DB التحليلي من Spark:
    • تحميل إلى Spark DataFrame
    • إنشاء جدول Spark
  • يتيح لك Synapse Apache Spark أيضًا استيعاب البيانات في Azure Cosmos DB. من المهم ملاحظة أنه يتم دائمًا استيعاب البيانات في حاوياتAzure Cosmos DB من خلال متجر المعاملات. عند تمكين Synapse Link، تتم مزامنة أي إدخالات وتحديثات وحذف جديدة تلقائيًا مع المتجر التحليلي.
  • يدعم Synapse Apache Spark أيضًا تدفق Spark المنظم باستخدام Azure Cosmos DB كمصدر بالإضافة إلى متلقٍ.

توجهك الأقسام التالية عبر بناء جملة القدرات المذكورة أعلاه. يمكنك أيضا سحب وحدة Learn النمطية حول كيفية الاستعلام عن Azure Cosmos DB باستخدام Apache Spark ل Azure Synapse Analytics. تم تصميم الإيماءات في مساحة عمل Azure Synapse Analytics لتوفير تجربة سهلة لبدء الاستخدام. تظهر الإيماءات عند النقر بزر الماوس الأيمن على حاوية Azure Cosmos DB في علامة التبويب ⁧⁩البيانات⁧⁩ في مساحة عمل Synapse. باستخدام الإيماءات، يمكنك إنشاء التعليمات البرمجية بسرعة وتخصيصها وفقًا لاحتياجاتك. تعتبر الإيماءات أيضًا مثالية لاكتشاف البيانات بنقرة واحدة.

هام

يجب أن تكون على دراية ببعض القيود في المخطط التحليلي التي قد تؤدي إلى سلوك غير متوقع في عمليات تحميل البيانات. على سبيل المثال، لا تتوفر في المخطط التحليلي سوى أول 1000 خاصية من مخطط المعاملات، ولا تتوفر الخصائص ذات المسافات، وما إلى ذلك. إذا كنت تواجه بعض النتائج غير المتوقعة، فتحقق من ⁧⁩قيود مخطط المتجر التحليلي⁧⁩ لمزيد من التفاصيل.

استعلام عن المخزن التحليلي Azure Cosmos DB

قبل أن تتعرف على الخيارين المحتملين للاستعلام عن متجر Azure Cosmos DB التحليلي، والتحميل إلى Spark DataFrame وإنشاء جدول Spark، يجدر بك استكشاف الاختلافات في التجربة حتى تتمكن من اختيار الخيار الذي يناسب احتياجاتك.

يدور الاختلاف في التجربة حول ما إذا كانت تغييرات البيانات الأساسية في حاوية Azure Cosmos DB يجب أن تنعكس تلقائيًا في التحليل الذي يتم إجراؤه في Spark. عندما يتم تسجيل Spark DataFrame أو إنشاء جدول Spark مقابل المخزن التحليلي للحاوية، يتم جلب بيانات التعريف حول اللقطة الحالية للبيانات في المخزن التحليلي إلى Spark من أجل الدفع الفعال للتحليل اللاحق. من المهم ملاحظة أنه نظرًا إلى أن Spark تتبع سياسة تقييم كسول، ما لم يتم استدعاء إجراء على Spark DataFrame أو تنفيذ استعلام SparkSQL مقابل جدول Spark، لا يتم جلب البيانات الفعلية من المخزن التحليلي للحاوية الأساسية.

في حالة ⁧⁩التحميل إلى Spark DataFrame⁧⁩، يتم تخزين بيانات التعريف التي تم جلبها مؤقتًا خلال مدة جلسة Spark، ومن ثمَّ يتم تقييم الإجراءات اللاحقة التي يتم استدعاؤها في DataFrame مقابل لقطة المتجر التحليلي في وقت إنشاء DataFrame.

من ناحية أخرى، في حالة ⁧⁩إنشاء جدول Spark⁧⁩، لا يتم تخزين بيانات التعريف لحالة المتجر التحليلية مؤقتًا في Spark وتتم إعادة تحميلها في كل تنفيذ استعلام SparkSQL مقابل جدول Spark.

ومن ثمَّ، يمكنك الاختيار بين التحميل إلى Spark DataFrame وإنشاء جدول Spark بناءً على ما إذا كنت تريد تقييم تحليل Spark مقابل لقطة ثابتة من المتجر التحليلي أو مقابل أحدث لقطة من المتجر التحليلي على التوالي.

إذا كانت الاستعلامات التحليلية الخاصة بك تستخدم عوامل التصفية بشكل متكرر، فلديك خيار التقسيم استنادا إلى هذه الحقول للحصول على استعلام أفضل. يمكنك تنفيذ مهمة التقسيم من دفتر ملاحظات Azure Synaps Spark بشكل دوري، لتشغيل التقسيم على مخزن تحليلي. يشير هذا المخزن المقسم إلى حساب التخزين الأساسي ADLS Ge2 المرتبط بمساحة Azure Synapse. لمعرفة المزيد، راجع مقدمة للتقسيم المعتاد وكيفية تكوين التقسيم المعتادللمقالات.

ملاحظة

للاستعلام عن Azure Cosmos DB لحسابات MongoDB، تعرف على المزيد حول تمثيل مخطط الدقة الكامل في المخزن التحليلي وأسماء الخصائص الموسعة التي سيتم استخدامها.

ملاحظة

يرجى ملاحظة أن جميع options في الأوامر أدناه حساسة لحالة الأحرف. على سبيل المثال، يجب عليك استخدام Gatewayبينما سيعرضgateway خطأ.

تحميل إلى Spark DataFrame

في هذا المثال، ستقوم بإنشاء Spark DataFrame الذي يشير إلى مخزن Azure Cosmos DB التحليلي. يمكنك بعد ذلك إجراء تحليل إضافي عن طريق استدعاء إجراءات Spark وفق DataFrame. لا تؤثر هذه العملية على متجر المعاملات.

سيكون بناء الجملة في Python كما يلي:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

df = spark.read.format("cosmos.olap")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .load()

ستكون البنية المكافئة في ⁧⁩Scala⁧⁩ كما يلي:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    load()

إنشاء جدول Spark

في هذا المثال، ستقوم بإنشاء جدول Spark يشير إلى مخزنAzure Cosmos DB التحليلي. يمكنك بعد ذلك إجراء تحليل إضافي عن طريق استدعاء استعلامات SparkSQL مقابل الجدول. لا تؤثر هذه العملية على متجر المعاملات ولا تتطلب أي حركة للبيانات. إذا قررت حذف جدول Spark هذا، فلن تتأثر حاوية Azure Cosmos DB الأساسية والمخزن التحليلي المقابل.

هذا السيناريو مناسب لإعادة استخدام جداول Spark من خلال أدوات الجهات الخارجية وتوفير إمكانية الوصول إلى البيانات الأساسية لوقت التشغيل.

تكون صيغة إنشاء جدول Spark كما يلي:

%%sql
-- To select a preferred list of regions in a multi-region Azure Cosmos DB account, add spark.cosmos.preferredRegions '<Region1>,<Region2>' in the config options

create table call_center using cosmos.olap options (
    spark.synapse.linkedService '<enter linked service name>',
    spark.cosmos.container '<enter container name>'
)

ملاحظة

إذا كانت لديك سيناريوهات يتغير فيها مخطط حاوية Azure Cosmos DB الأساسية بمرور الوقت؛ وإذا كنت تريد أن ينعكس المخطط المُحدث تلقائيًا في الاستعلامات مقابل جدول Spark، فيمكنك تحقيق ذلك من خلال تعيين الخيار spark.cosmos.autoSchemaMerge على true في خيارات جدول Spark.

اكتب Spark DataFrame في حاوية Azure Cosmos DB

في هذا المثال، ستكتب Spark DataFrame في حاوية Azure Cosmos DB. ستؤثر هذه العملية على أداء أعباء العمل الخاصة بالمعاملات وتستهلك وحدات الطلب المتوفرة في حاوية Azure Cosmos DB أو قاعدة البيانات المشتركة.

سيكون بناء الجملة في Python كما يلي:

# Write a Spark DataFrame into an Azure Cosmos DB container
# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

YOURDATAFRAME.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()

ستكون البنية المكافئة في ⁧⁩Scala⁧⁩ كما يلي:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

import org.apache.spark.sql.SaveMode

df.write.format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>"). 
    option("spark.cosmos.write.upsertEnabled", "true").
    mode(SaveMode.Overwrite).
    save()

تحميل DataFrame المتدفقة من الحاوية

في هذه الإيماءة، ستستخدم قدرة Spark Streaming لتحميل البيانات من حاوية إلى إطار بيانات. سيتم تخزين البيانات في حساب مجموعة البيانات الأساسي (ونظام الملفات) الذي قمت بتوصيله بمساحة العمل.

ملاحظة

إذا كنت تتطلع إلى الرجوع إلى مكتبات خارجية في Synapse Apache Spark، فتعرَّف على المزيد ⁧⁩هنا⁧⁩. على سبيل المثال، إذا كنت تبحث عن استيعاب Spark DataFrame إلى حاوية Azure Cosmos DB ل MongoDB، يمكنك استخدام موصل MongoDB ل Spark.

تحميل تدفق DataFrame من حاوية Azure Cosmos DB

في هذا المثال، ستستخدم قدرة Spark المهيكلة على الدفق لتحميل البيانات من حاوية Azure Cosmos DB إلى Spark المتدفقة DataFrame باستخدام وظيفة تغذية التغيير في Azure Cosmos DB. سيتم تخزين بيانات نقطة التحقق المستخدمة بواسطة Spark في حساب بحيرة البيانات الأساسي (ونظام الملفات) الذي قمت بتوصيله بمساحة العمل.

إذا لم يتم إنشاء المجلد localReadCheckpointFolder/ (في المثال أدناه)، فسيتم إنشاؤه تلقائيًا. ستؤثر هذه العملية على أداء أعباء العمل الخاصة بالمعاملات وتستهلك وحدات الطلب المتوفرة في حاوية Azure Cosmos DB أو قاعدة البيانات المشتركة.

سيكون بناء الجملة في Python كما يلي:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

dfStream = spark.readStream\
    .format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.changeFeed.readEnabled", "true")\
    .option("spark.cosmos.changeFeed.startFromTheBeginning", "true")\
    .option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder")\
    .option("spark.cosmos.changeFeed.queryName", "streamQuery")\
    .load()

ستكون البنية المكافئة في ⁧⁩Scala⁧⁩ كما يلي:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

val dfStream = spark.readStream.
    format("cosmos.oltp").
    option("spark.synapse.linkedService", "<enter linked service name>").
    option("spark.cosmos.container", "<enter container name>").
    option("spark.cosmos.changeFeed.readEnabled", "true").
    option("spark.cosmos.changeFeed.startFromTheBeginning", "true").
    option("spark.cosmos.changeFeed.checkpointLocation", "/localReadCheckpointFolder").
    option("spark.cosmos.changeFeed.queryName", "streamQuery").
    load()

اكتب دفق DataFrame إلى حاوية Azure Cosmos DB

في هذا المثال، ستكتب دفق DataFrame في حاوية Azure Cosmos DB. ستؤثر هذه العملية على أداء أعباء العمل الخاصة بالمعاملات وتستهلك وحدات الطلب المتوفرة في حاوية Azure Cosmos DB أو قاعدة البيانات المشتركة. إذا لم يتم إنشاء المجلد ⁧⁩localWriteCheckpointFolder/⁧⁩ (في المثال أدناه)، فسيتم إنشاؤه تلقائيًا.

سيكون بناء الجملة في ⁧⁩Python⁧⁩ كما يلي:

# To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

# If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

def writeBatchToCosmos(batchDF, batchId):
  batchDF.persist()
  print("--> BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.write.format("cosmos.oltp")\
    .option("spark.synapse.linkedService", "<enter linked service name>")\
    .option("spark.cosmos.container", "<enter container name>")\
    .option("spark.cosmos.write.upsertEnabled", "true")\
    .mode('append')\
    .save()
  print("<-- BatchId: {}, Document count: {} : {}".format(batchId, batchDF.count(), datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S.%f")))
  batchDF.unpersist()

streamQuery = dfStream\
        .writeStream\
        .foreachBatch(writeBatchToCosmos) \
        .option("checkpointLocation", "/localWriteCheckpointFolder")\
        .start()

streamQuery.awaitTermination()

ستكون البنية المكافئة في ⁧⁩Scala⁧⁩ كما يلي:

// To select a preferred list of regions in a multi-region Azure Cosmos DB account, add .option("spark.cosmos.preferredRegions", "<Region1>,<Region2>")

// If you are using managed private endpoints for Azure Cosmos DB analytical store and using batch writes/reads and/or streaming writes/reads to transactional store you should set connectionMode to Gateway. 

val query = dfStream.
            writeStream.
            foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.persist()
              batchDF.write.format("cosmos.oltp").
                option("spark.synapse.linkedService", "<enter linked service name>").
                option("spark.cosmos.container", "<enter container name>"). 
                option("spark.cosmos.write.upsertEnabled", "true").
                mode(SaveMode.Overwrite).
                save()
              println(s"BatchId: $batchId, Document count: ${batchDF.count()}")
              batchDF.unpersist()
              ()
            }.        
            option("checkpointLocation", "/localWriteCheckpointFolder").
            start()

query.awaitTermination()

الخطوات التالية