نسخ البيانات من Azure Cosmos DB إلى تجمع SQL مخصص باستخدام Apache Spark
رابط Azure Synapse لـ Azure Cosmos DB يمكّن المستخدمين من تشغيل التحليلات في الوقت الفعلي تقريبًا عبر البيانات التشغيلية في Azure Cosmos DB. ومع ذلك، هناك أوقات تحتاج فيها بعض البيانات إلى التجميع والإثراء لخدمة مستخدمي مستودع البيانات. يمكن إجراء تنظيم وتصدير بيانات ارتباط Azure Synapse باستخدام عدد قليل من الخلايا في دفتر الملاحظات.
المتطلبات الأساسية
- توفير مساحة عمل Synapse مع:
- توفير حساب Azure Cosmos DB مع حاوية HTAP مع البيانات
- توصيل حاوية Azure Cosmos DB HTAP بمساحة العمل
- الحصول على الإعداد المناسب لاستيراد البيانات إلى تجمع SQL مخصص من Spark
الخطوات
في هذا البرنامج التعليمي، سوف تتصل بالمتجر التحليلي حتى لا يكون هناك أي تأثير على متجر المعاملات (لن يستهلك أي وحدات طلب). سنمر بالخطوات التالية:
- قراءة حاوية Azure Cosmos DB HTAP في إطار بيانات Spark
- تجميع النتائج في إطار بيانات جديد
- استيعاب البيانات في تجمع SQL مخصص
بيانات
في هذا المثال، نستخدم حاوية HTAP تسمى RetailSales. إنه جزء من خدمة مرتبطة تسمى ConnectedData، ويحتوي على المخطط التالي:
- _rid: سلسلة (قيمة خالية = صحيح)
- _ts: طويل (قيمة خالية = صحيح)
- logQuantity: مزدوج (قيمة خالية = صحيح)
- productCode: سلسلة (قيمة خالية = صحيح)
- الكمية: طويلة (قيمة خالية = صحيح)
- السعر: طويل (قيمة خالية = صحيح)
- المعرف: سلسلة (قيمة خالية = صحيح)
- الإعلان: طويل (قيمة خالية = صحيح)
- storeId: طويل (قيمة خالية = صحيح)
- weekStarting: طويل (قيمة خالية = صحيح)
- _etag: سلسلة (قيمة خالية = صحيح)
سنقوم بتجميع المبيعات (الكمية والإيرادات (السعر x الكمية) حسب productCode و weekStarting لأغراض إعداد التقارير. وأخيرا، سنقوم بتصدير تلك البيانات إلى جدول تجمع SQL مخصص يسمى dbo.productsales
.
تكوين دفتر ملاحظات Spark
إنشاء دفتر ملاحظات Spark باستخدام Scala كـ Spark كلغة رئيسية. نستخدم الإعداد الافتراضي لدفتر الملاحظات لجلسة العمل.
قراءة البيانات في Spark
اقرأ حاوية Azure Cosmos DB HTAP مع Spark في إطار بيانات في الخلية الأولى.
val df_olap = spark.read.format("cosmos.olap").
option("spark.synapse.linkedService", "ConnectedData").
option("spark.cosmos.container", "RetailSales").
load()
تجميع النتائج في إطار بيانات جديد
في الخلية الثانية، نقوم بتشغيل التحويل والمجاميع اللازمة لإطار البيانات الجديد قبل تحميله في قاعدة بيانات تجمع SQL مخصصة.
// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))
تحميل النتائج في تجمع SQL مخصص
في الخلية الثالثة، نقوم بتحميل البيانات في تجمع SQL مخصص. سيقوم تلقائيًا بإنشاء جدول خارجي مؤقت ومصدر بيانات خارجي وتنسيق ملف خارجي سيتم حذفه بمجرد الانتهاء من المهمة.
df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)
الاستعلام عن النتائج باستخدام SQL
يمكنك الاستعلام عن النتيجة باستخدام استعلام SQL بسيط مثل البرنامج النصي SQL التالي:
SELECT [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
FROM [dbo].[productsales]
سيعرض الاستعلام النتائج التالية في وضع المخطط: