مشاركة عبر


تحديد وضع الإخراج للبث المنظم

تتناول هذه المقالة تحديد وضع إخراج للبث ذي الحالة. تتطلب التدفقات ذات الحالة التي تحتوي على التجميعات تكوين وضع الإخراج فقط.

تدعم الصلات وضع إخراج الإلحاق فقط، ولا يؤثر وضع الإخراج على إلغاء التكرار. عوامل التشغيل ذات الحالة التعسفية mapGroupsWithState وتبعث flatMapGroupsWithState السجلات باستخدام المنطق المخصص الخاص بها، لذلك لا يؤثر وضع إخراج الدفق على سلوكهم.

بالنسبة للتدفق عديم الحالة، تتصرف جميع أوضاع الإخراج بنفس الطريقة.

لتكوين وضع الإخراج بشكل صحيح، يجب فهم الدفق والعلامات المائية والمشغلات ذات الحالة. راجع المقالات التالية:

ما هو وضع الإخراج؟

يحدد وضع إخراج استعلام Structured Streaming الذي يسجل عوامل تشغيل الاستعلام التي تنبعث أثناء كل مشغل. الأنواع الثلاثة للسجلات التي يمكن إصدارها هي:

  • السجلات التي لا تتغير المعالجة المستقبلية.
  • السجلات التي تغيرت منذ المشغل الأخير.
  • كافة السجلات في جدول الحالة.

معرفة أنواع السجلات التي سيتم إرسالها أمر مهم لعوامل التشغيل ذات الحالة لأن صفا معينا ينتجه عامل تشغيل ذي حالة قد يتغير من مشغل إلى مشغل. على سبيل المثال، نظرا لأن عامل تجميع البث يتلقى المزيد من الصفوف لنافذة معينة، فقد تتغير قيم تجميع تلك النافذة عبر المشغلات.

بالنسبة إلى عوامل التشغيل عديمة الحالة، لا يؤثر التمييز بين أنواع السجلات على سلوك عامل التشغيل. السجلات التي يصدرها عامل التشغيل عديم الحالة أثناء المشغل هي دائما السجلات المصدر التي تتم معالجتها أثناء هذا المشغل.

أوضاع الإخراج المتوفرة

هناك ثلاثة أوضاع إخراج تخبر عامل التشغيل بالسجلات التي سيتم إرسالها أثناء مشغل معين:

وضع الإخراج ‏‏الوصف
وضع الإلحاق (افتراضي) بشكل افتراضي، يتم تشغيل الاستعلامات المتدفقة في وضع الإلحاق. في هذا الوضع، تصدر عوامل التشغيل فقط صفوفا لا تتغير في المشغلات المستقبلية. تستخدم عوامل التشغيل ذات الحالة العلامة المائية لتحديد وقت حدوث ذلك.
وضع التحديث في وضع التحديث، تصدر عوامل التشغيل جميع الصفوف التي تغيرت أثناء المشغل، حتى إذا كان السجل المنبعث قد يتغير في مشغل لاحق.
وضع الإكمال يعمل الوضع الكامل فقط مع تجميعات البث. في الوضع الكامل، يتم إصدار جميع الصفوف الناتجة من أي وقت مضى التي تنتجها عامل التشغيل انتقال البيانات من الخادم.

اعتبارات الإنتاج

بالنسبة للعديد من عمليات الدفق ذات الحالة، يجب الاختيار بين أوضاع الإلحاق والتحديث. توضح الأقسام التالية الاعتبارات التي قد تبلغ قرارك.

إشعار

يحتوي الوضع الكامل على بعض التطبيقات، ولكن يمكن أن يكون أداؤه ضعيفا مع تحجيم البيانات. توصي Databricks باستخدام طرق العرض المجسدة للحصول على ضمانات دلالية مرتبطة بالوضع الكامل مع المعالجة التزايدية للعديد من العمليات ذات الحالة. راجع استخدام طرق العرض المجسدة في Databricks SQL.

دلالات التطبيق

تصف دلالات التطبيق كيفية استخدام تطبيقات انتقال البيانات من الخادم إلى البيانات المتدفقة.

إذا كانت خدمات انتقال البيانات من الخادم تحتاج إلى اتخاذ إجراء واحد لكل كتابة في المراحل النهائية، فاستخدم وضع الإلحاق في معظم الحالات. على سبيل المثال، إذا كانت لديك خدمة إعلامات انتقال البيانات من الخادم ترسل إعلامات لكل سجل جديد مكتوب إلى المتلقي، يضمن وضع الإلحاق كتابة كل سجل مرة واحدة فقط. يكتب وضع التحديث السجل في كل مرة تتغير فيها معلومات الحالة، مما قد يؤدي إلى العديد من التحديثات.

إذا كانت خدمات انتقال البيانات من الخادم بحاجة إلى نتائج جديدة، فإن وضع التحديث يضمن بقاء المتلقي محدثا قدر الإمكان. تتضمن الأمثلة نموذج التعلم الآلي الذي يقرأ الميزات في الوقت الفعلي أو لوحة معلومات التحليلات التي تتبع التجميعات في الوقت الحقيقي.

توافق المشغل والمتلقي

لا يدعم الدفق المنظم جميع العمليات المتوفرة في Apache Spark، وبعض عمليات الدفق غير مدعومة في جميع أوضاع الإخراج. لمزيد من التفاصيل حول قيود المشغل، راجع مستندات تدفق OSS.

لا تدعم جميع المتلقيات جميع أوضاع الإخراج. يدعم كل من Delta Lake، الذي يدعم جميع الجداول المدارة لكتالوج Unity، وKafka جميع أوضاع الإخراج. لمزيد من التفاصيل حول توافق المتلقي، راجع مستندات تدفق OSS.

زمن الانتقال والتكلفة

يؤثر وضع الإخراج على مقدار الوقت الذي يجب أن ينقضي قبل كتابة سجل، ويمكن أن يؤثر تكرار ومقدار البيانات المكتوبة على التكاليف المرتبطة بالتدفقات المتدفقة.

يجبر وضع الإلحاق عوامل التشغيل ذات الحالة على إرسال النتائج فقط بعد الانتهاء من النتائج ذات الحالة، وهو ما دام تأخير العلامة المائية على الأقل. يعني تأخير العلامة المائية 1 hour في وضع إخراج الإلحاق أن السجلات لديك لها تأخير لمدة ساعة واحدة على الأقل قبل أن يتم إرسالها في المراحل النهائية.

ينتج عن وضع التحديث كتابة واحدة لكل مشغل لكل قيمة تجميعية. إذا كانت رسوم المتلقي لكل كتابة لكل سجل، فقد يكون هذا مكلفا إذا تم تحديث السجلات عدة مرات قبل مرور تأخير العلامة المائية.

أمثلة التكوين

تظهر أمثلة التعليمات البرمجية التالية تكوين وضع الإخراج لتدفق التحديثات إلى جداول كتالوج Unity:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

راجع مستندات OSS ل PySpark DataStreamWriter.outputMode أو Scala DataStreamWriter.outputMode.

مثال على أوضاع التدفق والإخراج ذات الحالة

يهدف المثال التالي إلى مساعدتك في التفكير من خلال كيفية تفاعل وضع الإخراج مع العلامات المائية للبث ذي الحالة.

ضع في اعتبارك تجميع تدفق يحسب إجمالي الإيرادات التي يتم إنشاؤها كل ساعة في متجر مع تأخير العلامة المائية لمدة 15 دقيقة. تعالج النسخة الصغيرة الأولى السجلات التالية:

  • 15 دولار في الساعة 2:40 مساء
  • 10 دولارات في الساعة 2:30 مساء
  • 30 دولار في الساعة 3:10 مساء

عند هذه النقطة، العلامة المائية للمحرك هي 2:55 مساء لأنها تطرح 15 دقيقة (التأخير) من الحد الأقصى للوقت المنظر (3:10 مساء). يحتوي عامل تجميع البث على ما يلي في حالته:

  • [2pm, 3pm]: 25 دولارا
  • [3pm, 4pm]: 30 دولارا

يوضح الجدول التالي ما قد يحدث في كل وضع إخراج:

وضع الإخراج النتيجة والسبب
إلحاق لا يصدر عامل تجميع البث أي شيء في المراحل النهائية. ويرجع ذلك إلى أن هاتين النافذتين قد تتغيران مع ظهور قيم جديدة مع مشغل لاحق: تشير العلامة المائية 2:55 مساء إلى أن السجلات بعد الساعة 2:55 مساء قد لا تزال تصل، وقد تقع هذه السجلات في [2pm, 3pm] النافذة أو [3pm, 4pm] النافذة.
Update يصدر عامل التشغيل كلا السجلين، لأن كلا السجلين تلقيا تحديثات.
إكمال يصدر عامل التشغيل كافة السجلات.

الآن، افترض أن الدفق يتلقى سجلا آخر:

  • 20 دولار في الساعة 3:20 مساء

يتم تحديث العلامة المائية إلى الساعة 3:05 مساء لأن المحرك يطرح 15 دقيقة من الساعة 3:20 مساء. عند هذه النقطة، يحتوي عامل تشغيل تجميع البث على ما يلي في حالته:

  • [2pm, 3pm]: 25 دولارا
  • [3pm, 4pm]: 50 دولارا

يوضح الجدول التالي ما قد يحدث في كل وضع إخراج:

وضع الإخراج النتيجة والسبب
إلحاق يلاحظ عامل تجميع الدفق العلامة المائية 3:05 مساء أكبر من نهاية [2pm, 3pm] النافذة. من خلال تعريف العلامة المائية، لم يعد من الممكن تغيير هذه النافذة، لذلك تنبعث منها [2pm, 3pm] النافذة.
Update يصدر عامل تجميع [3pm, 4pm] البث النافذة لأن قيمة الحالة قد تغيرت من 30 دولارا إلى 50 دولارا.
إكمال يصدر عامل التشغيل كافة السجلات.

يلخص ما يلي كيفية تصرف عوامل التشغيل ذات الحالة في كل وضع إلحاق:

  • في وضع الإلحاق، اكتب السجلات مرة واحدة بعد تأخير العلامة المائية.
  • في وضع التحديث، اكتب السجلات التي تغيرت منذ المشغل السابق.
  • في الوضع الكامل، اكتب جميع السجلات التي تم إنتاجها من قبل عامل التشغيل ذي الحالة.