تطبيق العلامات المائية للتحكم في حدود معالجة البيانات

تقدم هذه المقالة المفاهيم الأساسية للعلامات المائية وتوفر توصيات لاستخدام العلامات المائية في عمليات الدفق الشائعة ذات الحالة. يجب تطبيق العلامات المائية على عمليات الدفق ذات الحالة لتجنب توسيع كمية البيانات المحفوظة في الحالة بشكل لا نهائي، مما قد يؤدي إلى حدوث مشكلات في الذاكرة وزيادة زمن انتقال المعالجة أثناء عمليات الدفق طويلة الأمد.

ما هي العلامة المائية؟

يستخدم الدفق المنظم العلامات المائية للتحكم في الحد الزمني لمتابعة معالجة التحديثات لكيان حالة معين. تتضمن الأمثلة الشائعة لكيانات الحالة ما يلي:

  • التجميعات عبر نافذة زمنية.
  • مفاتيح فريدة في صلة بين دفقين.

عند الإعلان عن علامة مائية، يمكنك تحديد حقل طابع زمني وحد علامة مائية على DataFrame متدفق. عند وصول بيانات جديدة، يتتبع مدير الحالة الطابع الزمني الأحدث في الحقل المحدد ويعالج جميع السجلات ضمن حد زمن الانتقال.

يطبق المثال التالي حد العلامة المائية لمدة 10 دقائق على عدد النوافذ:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

في هذا المثال:

  • event_time يتم استخدام العمود لتعريف علامة مائية مدتها 10 دقائق ونافذة متقلبة مدتها 5 دقائق.
  • يتم جمع عدد لكل id نوافذ غير متداخلة لمدة 5 دقائق.
  • يتم الاحتفاظ بمعلومات الحالة لكل عدد حتى نهاية النافذة أقدم ب 10 دقائق من أحدث ما تمت ملاحظته event_time.

هام

تضمن حدود العلامة المائية معالجة السجلات التي تصل ضمن الحد المحدد وفقا لدلالات الاستعلام المحدد. قد لا تزال تتم معالجة السجلات المتأخرة التي تصل خارج الحد المحدد باستخدام مقاييس الاستعلام، ولكن هذا غير مضمون.

كيف تؤثر العلامات المائية على وقت المعالجة ومعدل النقل؟

تتفاعل العلامات المائية مع أوضاع الإخراج للتحكم في وقت كتابة البيانات إلى المتلقي. نظرا لأن العلامات المائية تقلل من إجمالي كمية معلومات الحالة التي سيتم معالجتها، فإن الاستخدام الفعال للعلامات المائية أمر ضروري لفعالية معدل نقل الدفق ذي الحالة.

إشعار

لا يتم دعم جميع أوضاع الإخراج لجميع العمليات ذات الحالة.

العلامات المائية ووضع الإخراج للتجميعات ذات النوافذ

يوضح الجدول التالي تفاصيل معالجة الاستعلامات مع التجميع على طابع زمني بعلامة مائية محددة:

وضع الإخراج سلوك
إلحاق تتم كتابة الصفوف إلى الجدول الهدف بمجرد تمرير حد العلامة المائية. يتم تأخير كافة عمليات الكتابة استنادا إلى حد التأخر. يتم إسقاط حالة التجميع القديمة بمجرد تجاوز الحد.
Update تتم كتابة الصفوف إلى الجدول الهدف حيث يتم حساب النتائج، ويمكن تحديثها والكتابة فوقها عند وصول بيانات جديدة. يتم إسقاط حالة التجميع القديمة بمجرد تجاوز الحد.
إكمال لا يتم إسقاط حالة التجميع. تتم إعادة كتابة الجدول الهدف مع كل مشغل.

العلامات المائية والإخراج لصلات دفق الدفق

تدعم الصلات بين تدفقات متعددة وضع الإلحاق فقط، وتتم كتابة السجلات المتطابقة في كل دفعة يتم اكتشافها. بالنسبة إلى الصلات الداخلية، توصي Databricks بتعيين حد علامة مائية على كل مصدر بيانات دفق. يسمح هذا بتجاهل معلومات الحالة للسجلات القديمة. بدون علامات مائية، يحاول Structured Streaming الانضمام إلى كل مفتاح من جانبي الصلة مع كل مشغل.

يحتوي الدفق المنظم على دلالات خاصة لدعم الصلات الخارجية. العلامة المائية إلزامية للصلات الخارجية، كما تشير إلى متى يجب كتابة مفتاح بقيمة خالية بعد عدم المتطابقة. لاحظ أنه على الرغم من أن الصلات الخارجية يمكن أن تكون مفيدة لتسجيل السجلات التي لا تتم مطابقتها أبدا أثناء معالجة البيانات، لأن الصلات تكتب فقط إلى الجداول ك عمليات إلحاق، لا يتم تسجيل هذه البيانات المفقودة إلا بعد مرور حد زمن الانتقال.

التحكم في حد البيانات المتأخرة باستخدام نهج العلامة المائية المتعددة في Structured Streaming

عند العمل مع مدخلات Structured Streaming متعددة، يمكنك تعيين علامات مائية متعددة للتحكم في حدود التسامح للبيانات المتأخرة. يسمح لك تكوين العلامات المائية بالتحكم في معلومات الحالة ويؤثر على زمن الانتقال.

يمكن أن يحتوي استعلام الدفق على تدفقات إدخال متعددة يتم توحيدها أو ضمها معا. يمكن أن يكون لكل من تدفقات الإدخال عتبة مختلفة من البيانات المتأخرة التي تحتاج إلى التسامح مع العمليات ذات الحالة. حدد هذه الحدود باستخدام withWatermarks("eventTime", delay) على كل تدفق من تدفقات الإدخال. فيما يلي مثال على استعلام مع عمليات ربط دفق البيانات.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

أثناء تشغيل الاستعلام، يتتبع Structured Streaming بشكل فردي الحد الأقصى لوقت الحدث الذي يظهر في كل دفق إدخال، ويحسب العلامات المائية بناء على التأخير المقابل، ويختار علامة مائية عمومية واحدة معها لاستخدامها في العمليات ذات الحالة. بشكل افتراضي، يتم اختيار الحد الأدنى كعلامة مائية عمومية لأنه يضمن عدم إسقاط أي بيانات عن طريق الخطأ بعد فوات الأوان إذا كان أحد التدفقات يقع خلف التدفقات الأخرى (على سبيل المثال، توقف أحد التدفقات عن تلقي البيانات بسبب فشل المنبع). بمعنى آخر، تتحرك العلامة المائية العمومية بأمان بوتيرة أبطأ تدفق ويتم تأخير إخراج الاستعلام وفقا لذلك.

إذا كنت تريد الحصول على نتائج أسرع، يمكنك تعيين نهج العلامة المائية المتعددة لاختيار القيمة القصوى كعلامة مائية عمومية عن طريق تعيين تكوين spark.sql.streaming.multipleWatermarkPolicy SQL إلى max (الافتراضي هو min). وهذا يتيح للعلامة المائية العمومية التحرك بوتيرة أسرع تدفق. ومع ذلك، يسقط هذا التكوين البيانات من أبطأ التدفقات. ولهذا السبب، توصي Databricks باستخدام هذا التكوين بحكمة.

إسقاط التكرارات داخل العلامة المائية

في Databricks Runtime 13.3 LTS وما فوق، يمكنك إلغاء تكرار السجلات ضمن حد العلامة المائية باستخدام معرف فريد.

يوفر الدفق المنظم ضمانات معالجة لمرة واحدة بالضبط، ولكنه لا يلغي تكرار السجلات تلقائيا من مصادر البيانات. يمكنك استخدام dropDuplicatesWithinWatermark لإلغاء تكرار السجلات في أي حقل محدد، مما يسمح لك بإزالة التكرارات من الدفق حتى إذا كانت بعض الحقول تختلف (مثل وقت الحدث أو وقت الوصول).

يضمن إسقاط السجلات المكررة التي تصل داخل العلامة المائية المحددة. هذا الضمان صارم في اتجاه واحد فقط، وقد يتم أيضا إسقاط السجلات المكررة التي تصل خارج الحد المحدد. يجب تعيين حد التأخير للعلامة المائية لفترة أطول من الحد الأقصى لاختلافات الطابع الزمني بين الأحداث المكررة لإزالة جميع التكرارات.

يجب تحديد علامة مائية لاستخدام dropDuplicatesWithinWatermark الأسلوب، كما في المثال التالي:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])