استخدام foreachBatch للكتابة إلى متلقي البيانات العشوائية

تتناول هذه المقالة استخدام foreachBatch مع Structured Streaming لكتابة إخراج استعلام دفق إلى مصادر البيانات التي لا تحتوي على مصدر تدفق موجود.

يسمح لك نمط streamingDF.writeStream.foreachBatch(...) التعليمات البرمجية بتطبيق وظائف الدفعة على بيانات الإخراج لكل دفعة صغيرة من استعلام الدفق. الدوال المستخدمة مع foreachBatch أخذ معلمتين:

  • DataFrame يحتوي على بيانات إخراج دفعة صغيرة.
  • المعرف الفريد للدفعة الصغيرة.

يجب استخدام foreachBatch لعمليات دمج Delta Lake في Structured Streaming. راجع Upsert من استعلامات الدفق باستخدام foreachBatch.

تطبيق عمليات DataFrame إضافية

لا يتم دعم العديد من عمليات DataFrame وDataset في دفق DataFrames لأن Spark لا يدعم إنشاء خطط تزايدية في هذه الحالات. foreachBatch() يمكنك استخدام تطبيق بعض هذه العمليات على كل إخراج دفعة صغيرة. على سبيل المثال، يمكنك استخدام foreachBath() عملية SQL MERGE INTO وكتابة إخراج تجميعات الدفق في جدول Delta في وضع التحديث. راجع المزيد من التفاصيل في MERGE INTO.

هام

  • foreachBatch() يوفر ضمانات كتابة مرة واحدة على الأقل فقط. ومع ذلك، يمكنك استخدام batchId المقدمة إلى الدالة كطريقة لإلغاء تكرار الإخراج والحصول على ضمان مرة واحدة بالضبط. في كلتا الحالتين، سيكون لديك سبب حول دلالات من طرف إلى طرف بنفسك.
  • foreachBatch()لا يعمل مع وضع المعالجة المستمرة لأنه يعتمد بشكل أساسي على تنفيذ الدفعة الصغيرة لاستعلام دفق. إذا كتبت البيانات في الوضع المستمر، فاستخدم foreach() بدلا من ذلك.

يمكن استدعاء إطار بيانات فارغ مع foreachBatch() ويجب أن تكون التعليمات البرمجية للمستخدم مرنة للسماح بالعملية المناسبة. يظهر مثال هنا:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

تغييرات السلوك في foreachBatch Databricks Runtime 14.0

في Databricks Runtime 14.0 وما فوق على الحساب المكون مع وضع الوصول المشترك، forEachBatch يعمل في عملية Python منفصلة منفصلة على Apache Spark، بدلا من بيئة REPL. يتم تسلسله ودفعه إلى Spark وليس لديه حق الوصول إلى الكائنات العمومية spark طوال مدة الجلسة.

في جميع تكوينات الحوسبة الأخرى، foreachBatch يعمل في نفس Python REPL الذي يقوم بتشغيل بقية التعليمات البرمجية الخاصة بك. ونتيجة لذلك، لا يتم تسلسل الدالة.

عند استخدام Databricks Runtime 14.0 وما فوق على الحساب المكون مع وضع الوصول المشترك، يجب استخدام sparkSession المتغير المحدد في DataFrame المحلي عند استخدام foreachBatch في Python، كما هو الحال في مثال التعليمات البرمجية التالي:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

تنطبق تغييرات السلوك التالية:

  • لا يمكنك الوصول إلى أي متغيرات Python عمومية من داخل وظيفتك.
  • print() تكتب الأوامر الإخراج إلى سجلات برنامج التشغيل.
  • يجب أن تكون أي ملفات أو وحدات أو كائنات مشار إليها في الدالة قابلة للتسلسل ومتاحة على Spark.

إعادة استخدام مصادر بيانات الدفعات الموجودة

باستخدام foreachBatch()، يمكنك استخدام كتاب بيانات الدفعات الحاليين لمتلقي البيانات التي قد لا تحتوي على دعم Structured Streaming. فيما يلي بعض الأمثلة على ذلك:

يمكن استخدام العديد من مصادر بيانات الدفعات الأخرى من foreachBatch(). راجع الاتصال لمصادر البيانات.

الكتابة إلى مواقع متعددة

إذا كنت بحاجة إلى كتابة إخراج استعلام دفق إلى مواقع متعددة، توصي Databricks باستخدام العديد من كتاب Structured Streaming للحصول على أفضل توازي ومعدل نقل.

يؤدي استخدام foreachBatch الكتابة إلى متلقيات متعددة إلى تسلسل تنفيذ عمليات الكتابة المتدفقة، ما يمكن أن يزيد من زمن الانتقال لكل دفعة صغيرة.

إذا كنت تستخدم foreachBatch للكتابة إلى جداول Delta متعددة، فشاهد كتابة الجدول المتكرر في foreachBatch.