مراقبة استعلامات الدفق المنظم على Azure Databricks

يوفر Azure Databricks مراقبة مضمنة لتطبيقات Structured Streaming من خلال واجهة مستخدم Spark ضمن علامة التبويب Streaming .

تمييز استعلامات الدفق المنظم في واجهة مستخدم Spark

قم بتوفير اسم استعلام فريد للتدفقات عن طريق إضافة .queryName(<query-name>) إلى التعليمات البرمجية الخاصة بك writeStream لتمييز المقاييس التي تنتمي إلى أي دفق في واجهة مستخدم Spark بسهولة.

دفع مقاييس Structured Streaming إلى الخدمات الخارجية

يمكن دفع مقاييس الدفق إلى الخدمات الخارجية لحالات استخدام التنبيه أو لوحة المعلومات باستخدام واجهة مستمع استعلام البث من Apache Spark. في Databricks Runtime 11.3 LTS وما فوق، يتوفر Streaming Query Listener في Python وSc scala.

هام

لا يمكن استخدام بيانات الاعتماد والكائنات التي يديرها كتالوج Unity في StreamingQueryListener المنطق.

إشعار

يمكن أن تؤثر معالجة زمن الانتقال المقترن بوحدة الاستماع سلبا على معالجة الاستعلام. توصي Databricks بتصغير منطق المعالجة في وحدات الاستماع هذه والكتابة إلى متلقي زمن الانتقال المنخفض مثل Kafka.

توفر التعليمات البرمجية التالية أمثلة أساسية لبناء الجملة لتنفيذ وحدة استماع:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

تحديد مقاييس يمكن ملاحظتها في Structured Streaming

تسمى المقاييس القابلة للمراقبة الدوال التجميعية العشوائية التي يمكن تعريفها في استعلام (DataFrame). بمجرد أن يصل تنفيذ DataFrame إلى نقطة اكتمال (أي إنهاء استعلام دفعي أو الوصول إلى فترة تدفق)، يتم إصدار حدث مسمى يحتوي على مقاييس البيانات التي تمت معالجتها منذ نقطة الإكمال الأخيرة.

يمكنك مراقبة هذه المقاييس عن طريق إرفاق وحدة استماع بجلسة Spark. تعتمد وحدة الاستماع على وضع التنفيذ:

  • وضع الدفعة: استخدم QueryExecutionListener.

    QueryExecutionListener يتم استدعاء عند اكتمال الاستعلام. الوصول إلى المقاييس باستخدام QueryExecution.observedMetrics الخريطة.

  • الدفق أو الدفعة الصغيرة: استخدم StreamingQueryListener.

    StreamingQueryListener يتم استدعاء عندما يكمل استعلام الدفق فترة. الوصول إلى المقاييس باستخدام StreamingQueryProgress.observedMetrics الخريطة. لا يدعم Azure Databricks دفق التنفيذ المستمر.

على سبيل المثال:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

مقاييس عنصر StreamingQueryListener

مقياس ‏‏الوصف
id معرف الاستعلام الفريد الذي يستمر عبر عمليات إعادة التشغيل. راجع StreamingQuery.id().
runId معرف استعلام فريد لكل بدء أو إعادة تشغيل. راجع StreamingQuery.runId().
name اسم الاستعلام المحدد من قبل المستخدم. قيمة خالية إذا لم يتم تحديدها.
timestamp الطابع الزمني لتنفيذ الدفعة الصغيرة.
batchId معرف فريد للدفعة الحالية من البيانات التي تتم معالجتها. لاحظ أنه في حالة إعادة المحاولة بعد الفشل، يمكن تنفيذ معرف دفعة معين أكثر من مرة. وبالمثل، عندما لا توجد بيانات لمعالجتها، لا يتم زيادة معرف الدفعة.
numInputRows تجميع (عبر جميع المصادر) عدد السجلات التي تمت معالجتها في مشغل.
inputRowsPerSecond إجمالي (عبر جميع المصادر) معدل وصول البيانات.
processedRowsPerSecond معدل التجميع (عبر جميع المصادر) الذي يقوم Spark بمعالجة البيانات عنده.

كائن durationMs

معلومات حول الوقت المستغرق لإكمال المراحل المختلفة من عملية تنفيذ الدفعة الصغيرة.

مقياس ‏‏الوصف
durationMs.addBatch الوقت المستغرق لتنفيذ الميكروباتش. وهذا يستبعد الوقت الذي يستغرقه Spark لتخطيط الميكروباتش.
durationMs.getBatch الوقت المستغرق لاسترداد بيانات التعريف حول الإزاحات من المصدر.
durationMs.latestOffset أحدث إزاحة مستهلكة لل microbatch. يشير كائن التقدم هذا إلى الوقت المستغرق لاسترداد أحدث إزاحة من المصادر.
durationMs.queryPlanning الوقت المستغرق لإنشاء خطة التنفيذ.
durationMs.triggerExecution الوقت المستغرق لتخطيط وتنفيذ الميكروبات.
durationMs.walCommit الوقت المستغرق لتثبيت الإزاحات المتوفرة الجديدة.

كائن eventTime

معلومات حول قيمة وقت الحدث التي تظهر داخل البيانات التي تتم معالجتها في الدفعة الصغيرة. يتم استخدام هذه البيانات بواسطة العلامة المائية لمعرفة كيفية اقتطاع الحالة لمعالجة التجميعات ذات الحالة المحددة في مهمة Structured Streaming.

مقياس ‏‏الوصف
eventTime.avg متوسط وقت الحدث الذي يظهر في المشغل.
eventTime.max الحد الأقصى لوقت الحدث الذي يظهر في المشغل.
eventTime.min الحد الأدنى لوقت الحدث الذي يظهر في المشغل.
eventTime.watermark قيمة العلامة المائية المستخدمة في المشغل.

كائن stateOperators

معلومات حول العمليات ذات الحالة المحددة في مهمة Structured Streaming والتجميعات التي يتم إنتاجها منها.

مقياس ‏‏الوصف
stateOperators.operatorName اسم عامل التشغيل ذي الحالة التي تتعلق بها المقاييس. على سبيل المثال، symmetricHashJoin، dedupe، stateStoreSave.
stateOperators.numRowsTotal عدد الصفوف في الحالة نتيجة عامل التشغيل أو التجميع ذي الحالة.
stateOperators.numRowsUpdated عدد الصفوف التي تم تحديثها في الحالة نتيجة عامل التشغيل أو التجميع ذي الحالة.
stateOperators.numRowsRemoved عدد الصفوف التي تمت إزالتها من الحالة نتيجة عامل التشغيل أو التجميع ذي الحالة.
stateOperators.commitTimeMs الوقت المستغرق لتثبيت جميع التحديثات (وضع وإزالات) وإرجاع إصدار جديد.
stateOperators.memoryUsedBytes الذاكرة المستخدمة من قبل مخزن الحالة.
stateOperators.numRowsDroppedByWatermark عدد الصفوف التي تعتبر متأخرة جدا بحيث لا يمكن تضمينها في التجميع ذي الحالة. تجميعات الدفق فقط: عدد الصفوف التي تم إسقاطها بعد التجميع، وليس صفوف الإدخال الخام. الرقم غير دقيق، ولكن يمكن أن يشير إلى أنه يتم إسقاط البيانات المتأخرة.
stateOperators.numShufflePartitions عدد أقسام التبديل العشوائي لهذا العامل ذي الحالة.
stateOperators.numStateStoreInstances مثيل مخزن الحالة الفعلي الذي قام عامل التشغيل بتهيئةه وصيانته. في العديد من عوامل التشغيل ذات الحالة، هذا هو نفس عدد الأقسام، ولكن ربط دفق الدفق يقوم بتهيئة أربعة مثيلات لمخزن الحالة لكل قسم.

كائن stateOperators.customMetrics

المعلومات التي يتم جمعها من RocksDB التي تلتقط مقاييس حول أدائها وعملياتها فيما يتعلق بالقيم ذات الحالة التي تحتفظ بها لوظيفة Structured Streaming. لمزيد من المعلومات، راجع تكوين مخزن حالة RocksDB على Azure Databricks.

مقياس ‏‏الوصف
customMetrics.rocksdbBytesCopied عدد وحدات البايت المنسخة كما تم تعقبها بواسطة RocksDB File Manager.
customMetrics.rocksdbCommitCheckpointLatency الوقت بالمللي ثانية لأخذ لقطة من RocksDB الأصلية وكتابتها في دليل محلي.
customMetrics.rocksdbCompactLatency الوقت بالمللي ثانية للضغط (اختياري) أثناء تثبيت نقطة التحقق.
customMetrics.rocksdbCommitFileSyncLatencyMs الوقت بالمللي ثانية لمزامنة الملفات الأصلية المتعلقة بلقطة RocksDB إلى تخزين خارجي (موقع نقطة التحقق).
customMetrics.rocksdbCommitFlushLatency الوقت بالمللي ثانية لمسح تغييرات RocksDB في الذاكرة على القرص المحلي.
customMetrics.rocksdbCommitPauseLatency الوقت بالمللي ثانية لإيقاف مؤشرات ترابط عامل الخلفية (على سبيل المثال، للضغط) كجزء من تثبيت نقطة التحقق.
customMetrics.rocksdbCommitWriteBatchLatency الوقت بالمللي ثانية لتطبيق عمليات الكتابة المرحلية في بنية الذاكرة (WriteBatch) على RocksDB الأصلي.
customMetrics.rocksdbFilesCopied عدد الملفات التي تم نسخها كما تم تعقبها بواسطة RocksDB File Manager.
customMetrics.rocksdbFilesReused عدد الملفات التي تمت إعادة استخدامها كما تم تعقبها بواسطة RocksDB File Manager.
customMetrics.rocksdbGetCount عدد get الاستدعاءات إلى DB (لا يتضمن gets ذلك من WriteBatch: دفعة في الذاكرة المستخدمة في عمليات الكتابة المرحلية).
customMetrics.rocksdbGetLatency متوسط الوقت بالنانوية للمكالمة الأصلية RocksDB::Get الأساسية.
customMetrics.rocksdbReadBlockCacheHitCount كم من ذاكرة التخزين المؤقت للكتلة في RocksDB مفيدة أم لا وتجنب قراءات القرص المحلي.
customMetrics.rocksdbReadBlockCacheMissCount كم من ذاكرة التخزين المؤقت للكتلة في RocksDB مفيدة أم لا وتجنب قراءات القرص المحلي.
customMetrics.rocksdbSstFileSize حجم جميع ملفات SST. تشير SST إلى جدول فرز ثابت، وهو البنية الجدولية التي يستخدمها RocksDB لتخزين البيانات.
customMetrics.rocksdbTotalBytesRead عدد وحدات البايت غير المضغوطة التي تقرأها get العمليات.
customMetrics.rocksdbTotalBytesReadByCompaction عدد وحدات البايت التي تقرأها عملية الضغط من القرص.
customMetrics.rocksdbTotalBytesReadThroughIterator تتطلب بعض العمليات ذات الحالة (على سبيل المثال، معالجة المهلة في FlatMapGroupsWithState والعلامات المائية) قراءة البيانات في DB من خلال مكرر. يمثل هذا المقياس حجم البيانات غير المضغوطة المقروءة باستخدام المكرر.
customMetrics.rocksdbTotalBytesWritten عدد وحدات البايت غير المضغوطة المكتوبة بواسطة put العمليات.
customMetrics.rocksdbTotalBytesWrittenByCompaction عدد وحدات البايت التي تكتبها عملية الضغط على القرص.
customMetrics.rocksdbTotalCompactionLatencyMs ميلي ثانية من الوقت لضغطات RocksDB، بما في ذلك ضغطات الخلفية والضغط الاختياري الذي بدأ أثناء التثبيت.
customMetrics.rocksdbTotalFlushLatencyMs وقت المسح، بما في ذلك مسح الخلفية. عمليات المسح هي العمليات التي يتم من خلالها مسح MemTable إلى التخزين بمجرد امتلاءه. MemTables هي المستوى الأول حيث يتم تخزين البيانات في RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed يدير RocksDB File Manager استخدام مساحة قرص ملف SST الفعلية وحذفها. يمثل هذا المقياس الملفات المضغوطة غير المضغوطة بالبايت كما تم الإبلاغ عنها بواسطة إدارة الملفات.

كائن المصادر (Kafka)

مقياس ‏‏الوصف
sources.description اسم المصدر الذي يقرأ منه استعلام البث. على سبيل المثال، “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”
sources.startOffset العنصر بدء رقم الإزاحة داخل موضوع Kafka الذي بدأت مهمة الدفق فيه.
sources.endOffset العنصر أحدث إزاحة تمت معالجتها بواسطة الميكروباتش. يمكن أن يكون هذا مساويا latestOffset لتنفيذ الميكروبات الجارية.
sources.latestOffset العنصر أحدث إزاحة تم رسمها بواسطة الميكروباتش. عندما يكون هناك تقييد، قد لا تعالج عملية الدفعات الصغيرة جميع الإزاحات، مما endOffset يسبب ويختلف latestOffset .
sources.numInputRows عدد صفوف الإدخال التي تمت معالجتها من هذا المصدر.
sources.inputRowsPerSecond معدل وصول البيانات للمعالجة لهذا المصدر.
sources.processedRowsPerSecond معدل معالجة Spark للبيانات لهذا المصدر.

عنصر sources.metrics (Kafka)

مقياس ‏‏الوصف
sources.metrics.avgOffsetsBehindLatest متوسط عدد الإزاحات التي يكون الاستعلام المتدفق خلف أحدث إزاحة متاحة بين جميع الموضوعات المشتركة.
sources.metrics.estimatedTotalBytesBehindLatest العدد المقدر لوحدات البايت التي لم تستهلكها عملية الاستعلام من الموضوعات المشتركة.
sources.metrics.maxOffsetsBehindLatest الحد الأقصى لعدد الإزاحات التي يكون الاستعلام المتدفق خلف أحدث إزاحة متاحة بين جميع الموضوعات المشتركة.
sources.metrics.minOffsetsBehindLatest الحد الأدنى لعدد الإزاحات التي يكون الاستعلام المتدفق خلف أحدث إزاحة متاحة بين جميع الموضوعات المشتركة.

كائن المتلقي (Kafka)

مقياس ‏‏الوصف
sink.description اسم المتلقي الذي يكتب إليه استعلام البث. على سبيل المثال، “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”
sink.numOutputRows عدد الصفوف التي تمت كتابتها إلى جدول الإخراج أو المتلقي كجزء من الميكروباتش. بالنسبة لبعض الحالات، يمكن أن تكون هذه القيمة "-1" ويمكن تفسيرها بشكل عام على أنها "غير معروفة".

كائن المصادر (Delta Lake)

مقياس ‏‏الوصف
sources.description اسم المصدر الذي يقرأ منه استعلام البث. على سبيل المثال، “DeltaSource[table]”
sources.[startOffset/endOffset].sourceVersion إصدار التسلسل الذي يتم ترميز هذه الإزاحة به.
sources.[startOffset/endOffset].reservoirId معرف الجدول الذي تقرأ منه. يتم استخدام هذا للكشف عن التكوين الخاطئ عند إعادة تشغيل استعلام.
sources.[startOffset/endOffset].reservoirVersion إصدار الجدول الذي تقوم بمعالجته حاليا.
sources.[startOffset/endOffset].index الفهرس في تسلسل AddFiles في هذا الإصدار. يتم استخدام هذا لتقسيم التثبيتات الكبيرة إلى دفعات متعددة. يتم إنشاء هذا الفهرس عن طريق الفرز على modificationTimestamp و path.
sources.[startOffset/endOffset].isStartingVersion ما إذا كانت هذه الإزاحة تشير إلى استعلام يبدأ بدلا من معالجة التغييرات. عند بدء استعلام جديد، تتم معالجة جميع البيانات الموجودة في الجدول في البداية، ثم تتم معالجة البيانات الجديدة التي وصلت.
sources.latestOffset أحدث إزاحة تمت معالجتها بواسطة استعلام microbatch.
sources.numInputRows عدد صفوف الإدخال التي تمت معالجتها من هذا المصدر.
sources.inputRowsPerSecond معدل وصول البيانات للمعالجة لهذا المصدر.
sources.processedRowsPerSecond معدل معالجة Spark للبيانات لهذا المصدر.
sources.metrics.numBytesOutstanding حجم الملفات المعلقة (الملفات التي تتبعها RocksDB) مجتمعة. هذا هو مقياس تراكم دلتا والتحميل التلقائي كمصدر الدفق.
sources.metrics.numFilesOutstanding عدد الملفات المعلقة التي ستتم معالجتها. هذا هو مقياس تراكم دلتا والتحميل التلقائي كمصدر الدفق.

كائن المتلقي (Delta Lake)

مقياس ‏‏الوصف
sink.description اسم المتلقي الذي يكتب الاستعلام المتدفق إليه. على سبيل المثال، “DeltaSink[table]”
sink.numOutputRows عدد الصفوف في هذا المقياس هو "-1" لأن Spark لا يمكنه استنتاج صفوف الإخراج لأحواض DSv1، وهو تصنيف متلقي Delta Lake.

الأمثلة

مثال حدث Kafka-to-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

مثال حدث Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

مثال على مصدر السعر لحدث Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}