تحسين المعالجة ذات الحالة في Delta Live Tables باستخدام العلامات المائية

لإدارة البيانات المحفوظة في الحالة بشكل فعال، استخدم العلامات المائية عند إجراء معالجة دفق ذات حالة في جداول Delta Live، بما في ذلك التجميعات والصلات وإلغاء التكرار. توضح هذه المقالة كيفية استخدام العلامات المائية في استعلامات Delta Live Tables وتتضمن أمثلة للعمليات الموصى بها.

إشعار

لضمان معالجة الاستعلامات التي تقوم بالتجميعات بشكل متزايد وعدم إعادة حسابها بالكامل مع كل تحديث، يجب استخدام العلامات المائية.

ما هي العلامة المائية؟

في معالجة الدفق، العلامة المائية هي ميزة Apache Spark يمكنها تحديد حد زمني لمعالجة البيانات عند تنفيذ عمليات ذات حالة مثل التجميعات. تتم معالجة البيانات التي تصل إلى حتى يتم الوصول إلى الحد، وعند هذه النقطة يتم إغلاق النافذة الزمنية المحددة بواسطة الحد. يمكن استخدام العلامات المائية لتجنب المشكلات أثناء معالجة الاستعلام، بشكل رئيسي عند معالجة مجموعات بيانات أكبر أو معالجة طويلة الأمد. يمكن أن تتضمن هذه المشاكل زمن انتقال عال في إنتاج النتائج وحتى أخطاء نفاد الذاكرة (OOM) بسبب كمية البيانات المحفوظة في الحالة أثناء المعالجة. نظرا لأن البيانات المتدفقة غير مرتبة بطبيعتها، تدعم العلامات المائية أيضا حساب العمليات بشكل صحيح مثل تجميعات النوافذ الزمنية.

لمعرفة المزيد حول استخدام العلامات المائية في معالجة الدفق، راجع العلامات المائية في Apache Spark Structured Streaming وتطبيق العلامات المائية للتحكم في حدود معالجة البيانات.

كيف يمكنك تحديد علامة مائية؟

يمكنك تعريف علامة مائية عن طريق تحديد حقل طابع زمني وقيمة تمثل الحد الزمني للبيانات المتأخرة للوصول. تعتبر البيانات متأخرة إذا وصلت بعد حد الوقت المحدد. على سبيل المثال، إذا تم تعريف الحد على أنه 10 دقائق، فقد يتم إسقاط السجلات التي تصل بعد حد 10 دقائق.

نظرا لأنه قد يتم إسقاط السجلات التي تصل بعد الحد المحدد، فمن المهم تحديد حد يفي بمتطلبات زمن الانتقال مقابل متطلبات التصحيح. يؤدي اختيار حد أصغر إلى إرسال السجلات في وقت أقرب ولكن يعني أيضا أنه من المرجح أن يتم إسقاط السجلات المتأخرة. الحد الأكبر يعني الانتظار لفترة أطول ولكن ربما أكثر اكتمالا للبيانات. بسبب حجم الحالة الأكبر، قد يتطلب الحد الأكبر أيضا موارد حوسبة إضافية. نظرا لأن قيمة الحد تعتمد على متطلبات البيانات والمعالجة الخاصة بك، فإن اختبار ومراقبة المعالجة الخاصة بك أمر مهم لتحديد الحد الأمثل.

يمكنك استخدام الدالة withWatermark() في Python لتعريف علامة مائية. في SQL، استخدم WATERMARK عبارة لتعريف علامة مائية:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

استخدام العلامات المائية مع الصلات الدفقية

بالنسبة إلى عمليات ربط الدفق، يجب تحديد علامة مائية على جانبي الصلة وعبارة فاصل زمني. نظرا لأن كل مصدر صلة لديه طريقة عرض غير مكتملة للبيانات، فإن عبارة الفاصل الزمني مطلوبة لإخبار محرك الدفق عندما لا يمكن إجراء أي تطابقات أخرى. يجب أن تستخدم عبارة الفاصل الزمني نفس الحقول المستخدمة لتعريف العلامات المائية.

نظرا إلى أنه قد تكون هناك أوقات يتطلب فيها كل دفق حدودا مختلفة للعلامات المائية، لا تحتاج التدفقات إلى نفس الحدود. لتجنب فقدان البيانات، يحتفظ محرك الدفق بعلامة مائية عالمية واحدة استنادا إلى أبطأ تدفق.

ينضم المثال التالي إلى دفق مرات الظهور الإعلانية ودفق نقرات المستخدم على الإعلانات. في هذا المثال، يجب أن تحدث نقرة في غضون 3 دقائق من الظهور. بعد مرور الفاصل الزمني لمدة 3 دقائق، يتم إسقاط الصفوف من الحالة التي لم يعد من الممكن مطابقتها.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

تنفيذ التجميعات ذات النوافذ باستخدام العلامات المائية

العملية الشائعة ذات الحالة على دفق البيانات هي التجميع في نافذة. التجميعات ذات النوافذ مشابهة للتجميعات المجمعة، باستثناء أنه يتم إرجاع القيم التجميعية لمجموعة الصفوف التي تعد جزءا من النافذة المعرفة.

يمكن تعريف النافذة على أنها طول معين، ويمكن إجراء عملية تجميع على جميع الصفوف التي تشكل جزءا من تلك النافذة. يدعم Spark Streaming ثلاثة أنواع من النوافذ:

  • النوافذ المتقلبة (الثابتة): سلسلة من الفواصل الزمنية الثابتة الحجم وغير المتراكبة والمتقاربة. ينتمي سجل الإدخال إلى نافذة واحدة فقط.
  • النوافذ المنزلقة: على غرار النوافذ المتقلبة، تكون النوافذ المنزلقة ثابتة الحجم، ولكن يمكن أن تتداخل النوافذ، ويمكن أن يقع السجل في نوافذ متعددة.

عند وصول البيانات بعد نهاية النافذة بالإضافة إلى طول العلامة المائية، لا يتم قبول أي بيانات جديدة للنافذة، ويتم إصدار نتيجة التجميع، ويتم إسقاط حالة النافذة.

يحسب المثال التالي مجموع مرات الظهور كل 5 دقائق باستخدام نافذة ثابتة. في هذا المثال، تستخدم عبارة التحديد الاسم المستعار impressions_window، ثم يتم تعريف النافذة نفسها كجزء من العبارة GROUP BY . يجب أن تستند النافذة إلى نفس عمود الطابع الزمني مثل العلامة المائية clickTimestamp ، العمود في هذا المثال.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

مثال مشابه في Python لحساب الربح على النوافذ الثابتة بالساعة:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    dlt.read_stream("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

إلغاء تكرار سجلات الدفق

يحتوي الدفق المنظم على ضمانات معالجة لمرة واحدة بالضبط ولكنه لا يقوم تلقائيا بإلغاء تكرار السجلات من مصادر البيانات. على سبيل المثال، نظرا لأن العديد من قوائم انتظار الرسائل تحتوي على ضمانات مرة واحدة على الأقل، يجب توقع السجلات المكررة عند القراءة من إحدى قوائم انتظار الرسائل هذه. يمكنك استخدام الدالة dropDuplicatesWithinWatermark() لإلغاء تكرار السجلات في أي حقل محدد، وإزالة التكرارات من دفق حتى إذا كانت بعض الحقول تختلف (مثل وقت الحدث أو وقت الوصول). يجب تحديد علامة مائية لاستخدام الدالة dropDuplicatesWithinWatermark() . يتم إسقاط جميع البيانات المكررة التي تصل ضمن النطاق الزمني المحدد بواسطة العلامة المائية.

البيانات المطلوبة مهمة لأن البيانات غير المطلوبة تتسبب في انتقال قيمة العلامة المائية إلى الأمام بشكل غير صحيح. بعد ذلك، عند وصول البيانات القديمة، يتم اعتبارها متأخرة وتم إسقاطها. withEventTimeOrder استخدم الخيار لمعالجة اللقطة الأولية بالترتيب استنادا إلى الطابع الزمني المحدد في العلامة المائية. withEventTimeOrder يمكن تعريف الخيار في التعليمات البرمجية التي تحدد مجموعة البيانات أو في إعدادات البنية الأساسية لبرنامج ربط العمليات التجارية باستخدام spark.databricks.delta.withEventTimeOrder.enabled. على سبيل المثال:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

إشعار

withEventTimeOrder الخيار مدعوم فقط مع Python.

في المثال التالي، تتم معالجة البيانات مرتبة بواسطة clickTimestamp، ويتم إسقاط السجلات التي تصل في غضون 5 ثوان من بعضها البعض والتي تحتوي على تكرار userIdclickAdId وأعمدة.

clicksDedupDf = (
  spark.readStream
    .option("withEventTimeOrder", "true")
    .table(rawClicks)
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

تحسين تكوين البنية الأساسية لبرنامج ربط العمليات التجارية للمعالجة ذات الحالة

للمساعدة في منع مشكلات الإنتاج وزمن الانتقال المفرط، توصي Databricks بتمكين إدارة الحالة المستندة إلى RocksDB لمعالجة الدفق ذات الحالة الخاصة بك، خاصة إذا كانت المعالجة تتطلب توفير كمية كبيرة من الحالة المتوسطة. لتمكين مخزن حالة RocksDB، راجع تمكين مخزن حالة RocksDB ل Delta Live Tables.