نسخ البيانات من Azure Cosmos DB إلى تجمع SQL مخصص باستخدام Apache Spark

رابط Azure Synapse لـ Azure Cosmos DB يمكّن المستخدمين من تشغيل التحليلات في الوقت الفعلي تقريبًا عبر البيانات التشغيلية في Azure Cosmos DB. ومع ذلك، هناك أوقات تحتاج فيها بعض البيانات إلى التجميع والإثراء لخدمة مستخدمي مستودع البيانات. يمكن إجراء تنظيم وتصدير بيانات ارتباط Azure Synapse باستخدام عدد قليل من الخلايا في دفتر الملاحظات.

المتطلبات الأساسية

‏‏الخطوات

في هذا البرنامج التعليمي، سوف تتصل بالمتجر التحليلي حتى لا يكون هناك أي تأثير على متجر المعاملات (لن يستهلك أي وحدات طلب). سنمر بالخطوات التالية:

  1. قراءة حاوية Azure Cosmos DB HTAP في إطار بيانات Spark
  2. تجميع النتائج في إطار بيانات جديد
  3. استيعاب البيانات في تجمع SQL مخصص

Spark إلى SQL Steps 1

البيانات

في هذا المثال، نستخدم حاوية HTAP تسمى ⁧⁩RetailSales⁧⁩. وهي جزء من خدمة مرتبطة تسمى ⁧⁩ConnectedData⁧⁩، وتحتوي على المخطط التالي:

  • _rid: سلسلة (قيمة خالية = صحيح)
  • _ts: طويل (قيمة خالية = صحيح)
  • logQuantity: مزدوج (قيمة خالية = صحيح)
  • productCode: سلسلة (قيمة خالية = صحيح)
  • الكمية: طويلة (قيمة خالية = صحيح)
  • السعر: طويل (قيمة خالية = صحيح)
  • المعرف: سلسلة (قيمة خالية = صحيح)
  • الإعلان: طويل (قيمة خالية = صحيح)
  • storeId: طويل (قيمة خالية = صحيح)
  • weekStarting: طويل (قيمة خالية = صحيح)
  • _etag: سلسلة (قيمة خالية = صحيح)

سنقوم بتجميع المبيعات⁧⁩(الكمية⁧⁩⁧⁩والإيرادات⁧⁩ (السعر × الكمية) حسب ⁧⁩productCode⁧⁩ و⁧⁩weekStarting⁧⁩ لأغراض الإبلاغ. وأخيرًا، سنقوم بتصدير تلك البيانات إلى جدول تجمع SQL مخصص يسمى ⁧dbo.productsales⁩.

تكوين دفتر ملاحظات Spark

إنشاء دفتر ملاحظات Spark باستخدام Scala كـ Spark كلغة رئيسية. نستخدم الإعداد الافتراضي لدفتر الملاحظات لجلسة العمل.

قراءة البيانات في Spark

اقرأ حاوية Azure Cosmos DB HTAP مع Spark في إطار بيانات في الخلية الأولى.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

تجميع النتائج في إطار بيانات جديد

في الخلية الثانية، نقوم بتشغيل التحويل والمجاميع اللازمة لإطار البيانات الجديد قبل تحميله في قاعدة بيانات تجمع SQL مخصصة.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

تحميل النتائج في تجمع SQL مخصص

في الخلية الثالثة، نقوم بتحميل البيانات في تجمع SQL مخصص. سيقوم تلقائيًا بإنشاء جدول خارجي مؤقت ومصدر بيانات خارجي وتنسيق ملف خارجي سيتم حذفه بمجرد الانتهاء من المهمة.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

الاستعلام عن النتائج باستخدام SQL

يمكنك الاستعلام عن النتيجة باستخدام استعلام SQL بسيط مثل البرنامج النصي SQL التالي:

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

سيقدم الاستعلام النتائج التالية في وضع المخطط: Spark إلى SQL Steps 2

الخطوات التالية