مراقبة استعلامات الدفق المنظم على Azure Databricks
يوفر Azure Databricks مراقبة مضمنة لتطبيقات Structured Streaming من خلال واجهة مستخدم Spark ضمن علامة التبويب Streaming .
تمييز استعلامات الدفق المنظم في واجهة مستخدم Spark
قم بتوفير اسم استعلام فريد للتدفقات عن طريق إضافة .queryName(<query-name>)
إلى التعليمات البرمجية الخاصة بك writeStream
لتمييز المقاييس التي تنتمي إلى أي دفق في واجهة مستخدم Spark بسهولة.
دفع مقاييس Structured Streaming إلى الخدمات الخارجية
يمكن دفع مقاييس الدفق إلى الخدمات الخارجية لحالات استخدام التنبيه أو لوحة المعلومات باستخدام واجهة مستمع استعلام البث من Apache Spark. في Databricks Runtime 11.3 LTS وما فوق، يتوفر Streaming Query Listener في Python وSc scala.
هام
لا يمكن استخدام بيانات الاعتماد والكائنات التي يديرها كتالوج Unity في StreamingQueryListener
المنطق.
إشعار
يمكن أن تؤثر معالجة زمن الانتقال المقترن بوحدة الاستماع سلبا على معالجة الاستعلام. توصي Databricks بتصغير منطق المعالجة في وحدات الاستماع هذه والكتابة إلى متلقي زمن الانتقال المنخفض مثل Kafka.
توفر التعليمات البرمجية التالية أمثلة أساسية لبناء الجملة لتنفيذ وحدة استماع:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
تحديد مقاييس يمكن ملاحظتها في Structured Streaming
تسمى المقاييس القابلة للمراقبة الدوال التجميعية العشوائية التي يمكن تعريفها في استعلام (DataFrame). بمجرد أن يصل تنفيذ DataFrame إلى نقطة اكتمال (أي إنهاء استعلام دفعي أو الوصول إلى فترة تدفق)، يتم إصدار حدث مسمى يحتوي على مقاييس البيانات التي تمت معالجتها منذ نقطة الإكمال الأخيرة.
يمكنك مراقبة هذه المقاييس عن طريق إرفاق وحدة استماع بجلسة Spark. تعتمد وحدة الاستماع على وضع التنفيذ:
وضع الدفعة: استخدم
QueryExecutionListener
.QueryExecutionListener
يتم استدعاء عند اكتمال الاستعلام. الوصول إلى المقاييس باستخدامQueryExecution.observedMetrics
الخريطة.الدفق أو الدفعة الصغيرة: استخدم
StreamingQueryListener
.StreamingQueryListener
يتم استدعاء عندما يكمل استعلام الدفق فترة. الوصول إلى المقاييس باستخدامStreamingQueryProgress.observedMetrics
الخريطة. لا يدعم Azure Databricks دفق التنفيذ المستمر.
على سبيل المثال:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
مقاييس عنصر StreamingQueryListener
مقياس | الوصف |
---|---|
id |
معرف الاستعلام الفريد الذي يستمر عبر عمليات إعادة التشغيل. راجع StreamingQuery.id(). |
runId |
معرف استعلام فريد لكل بدء أو إعادة تشغيل. راجع StreamingQuery.runId(). |
name |
اسم الاستعلام المحدد من قبل المستخدم. قيمة خالية إذا لم يتم تحديدها. |
timestamp |
الطابع الزمني لتنفيذ الدفعة الصغيرة. |
batchId |
معرف فريد للدفعة الحالية من البيانات التي تتم معالجتها. لاحظ أنه في حالة إعادة المحاولة بعد الفشل، يمكن تنفيذ معرف دفعة معين أكثر من مرة. وبالمثل، عندما لا توجد بيانات لمعالجتها، لا يتم زيادة معرف الدفعة. |
numInputRows |
تجميع (عبر جميع المصادر) عدد السجلات التي تمت معالجتها في مشغل. |
inputRowsPerSecond |
إجمالي (عبر جميع المصادر) معدل وصول البيانات. |
processedRowsPerSecond |
معدل التجميع (عبر جميع المصادر) الذي يقوم Spark بمعالجة البيانات عنده. |
كائن durationMs
معلومات حول الوقت المستغرق لإكمال المراحل المختلفة من عملية تنفيذ الدفعة الصغيرة.
مقياس | الوصف |
---|---|
durationMs.addBatch |
الوقت المستغرق لتنفيذ الميكروباتش. وهذا يستبعد الوقت الذي يستغرقه Spark لتخطيط الميكروباتش. |
durationMs.getBatch |
الوقت المستغرق لاسترداد بيانات التعريف حول الإزاحات من المصدر. |
durationMs.latestOffset |
أحدث إزاحة مستهلكة لل microbatch. يشير كائن التقدم هذا إلى الوقت المستغرق لاسترداد أحدث إزاحة من المصادر. |
durationMs.queryPlanning |
الوقت المستغرق لإنشاء خطة التنفيذ. |
durationMs.triggerExecution |
الوقت المستغرق لتخطيط وتنفيذ الميكروبات. |
durationMs.walCommit |
الوقت المستغرق لتثبيت الإزاحات المتوفرة الجديدة. |
كائن eventTime
معلومات حول قيمة وقت الحدث التي تظهر داخل البيانات التي تتم معالجتها في الدفعة الصغيرة. يتم استخدام هذه البيانات بواسطة العلامة المائية لمعرفة كيفية اقتطاع الحالة لمعالجة التجميعات ذات الحالة المحددة في مهمة Structured Streaming.
مقياس | الوصف |
---|---|
eventTime.avg |
متوسط وقت الحدث الذي يظهر في المشغل. |
eventTime.max |
الحد الأقصى لوقت الحدث الذي يظهر في المشغل. |
eventTime.min |
الحد الأدنى لوقت الحدث الذي يظهر في المشغل. |
eventTime.watermark |
قيمة العلامة المائية المستخدمة في المشغل. |
كائن stateOperators
معلومات حول العمليات ذات الحالة المحددة في مهمة Structured Streaming والتجميعات التي يتم إنتاجها منها.
مقياس | الوصف |
---|---|
stateOperators.operatorName |
اسم عامل التشغيل ذي الحالة التي تتعلق بها المقاييس. على سبيل المثال، symmetricHashJoin ، dedupe ، stateStoreSave . |
stateOperators.numRowsTotal |
عدد الصفوف في الحالة نتيجة عامل التشغيل أو التجميع ذي الحالة. |
stateOperators.numRowsUpdated |
عدد الصفوف التي تم تحديثها في الحالة نتيجة عامل التشغيل أو التجميع ذي الحالة. |
stateOperators.numRowsRemoved |
عدد الصفوف التي تمت إزالتها من الحالة نتيجة عامل التشغيل أو التجميع ذي الحالة. |
stateOperators.commitTimeMs |
الوقت المستغرق لتثبيت جميع التحديثات (وضع وإزالات) وإرجاع إصدار جديد. |
stateOperators.memoryUsedBytes |
الذاكرة المستخدمة من قبل مخزن الحالة. |
stateOperators.numRowsDroppedByWatermark |
عدد الصفوف التي تعتبر متأخرة جدا بحيث لا يمكن تضمينها في التجميع ذي الحالة. تجميعات الدفق فقط: عدد الصفوف التي تم إسقاطها بعد التجميع، وليس صفوف الإدخال الخام. الرقم غير دقيق، ولكن يمكن أن يشير إلى أنه يتم إسقاط البيانات المتأخرة. |
stateOperators.numShufflePartitions |
عدد أقسام التبديل العشوائي لهذا العامل ذي الحالة. |
stateOperators.numStateStoreInstances |
مثيل مخزن الحالة الفعلي الذي قام عامل التشغيل بتهيئةه وصيانته. في العديد من عوامل التشغيل ذات الحالة، هذا هو نفس عدد الأقسام، ولكن ربط دفق الدفق يقوم بتهيئة أربعة مثيلات لمخزن الحالة لكل قسم. |
كائن stateOperators.customMetrics
المعلومات التي يتم جمعها من RocksDB التي تلتقط مقاييس حول أدائها وعملياتها فيما يتعلق بالقيم ذات الحالة التي تحتفظ بها لوظيفة Structured Streaming. لمزيد من المعلومات، راجع تكوين مخزن حالة RocksDB على Azure Databricks.
مقياس | الوصف |
---|---|
customMetrics.rocksdbBytesCopied |
عدد وحدات البايت المنسخة كما تم تعقبها بواسطة RocksDB File Manager. |
customMetrics.rocksdbCommitCheckpointLatency |
الوقت بالمللي ثانية لأخذ لقطة من RocksDB الأصلية وكتابتها في دليل محلي. |
customMetrics.rocksdbCompactLatency |
الوقت بالمللي ثانية للضغط (اختياري) أثناء تثبيت نقطة التحقق. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
الوقت بالمللي ثانية لمزامنة الملفات الأصلية المتعلقة بلقطة RocksDB إلى تخزين خارجي (موقع نقطة التحقق). |
customMetrics.rocksdbCommitFlushLatency |
الوقت بالمللي ثانية لمسح تغييرات RocksDB في الذاكرة على القرص المحلي. |
customMetrics.rocksdbCommitPauseLatency |
الوقت بالمللي ثانية لإيقاف مؤشرات ترابط عامل الخلفية (على سبيل المثال، للضغط) كجزء من تثبيت نقطة التحقق. |
customMetrics.rocksdbCommitWriteBatchLatency |
الوقت بالمللي ثانية لتطبيق عمليات الكتابة المرحلية في بنية الذاكرة (WriteBatch ) على RocksDB الأصلي. |
customMetrics.rocksdbFilesCopied |
عدد الملفات التي تم نسخها كما تم تعقبها بواسطة RocksDB File Manager. |
customMetrics.rocksdbFilesReused |
عدد الملفات التي تمت إعادة استخدامها كما تم تعقبها بواسطة RocksDB File Manager. |
customMetrics.rocksdbGetCount |
عدد get الاستدعاءات إلى DB (لا يتضمن gets ذلك من WriteBatch : دفعة في الذاكرة المستخدمة في عمليات الكتابة المرحلية). |
customMetrics.rocksdbGetLatency |
متوسط الوقت بالنانوية للمكالمة الأصلية RocksDB::Get الأساسية. |
customMetrics.rocksdbReadBlockCacheHitCount |
كم من ذاكرة التخزين المؤقت للكتلة في RocksDB مفيدة أم لا وتجنب قراءات القرص المحلي. |
customMetrics.rocksdbReadBlockCacheMissCount |
كم من ذاكرة التخزين المؤقت للكتلة في RocksDB مفيدة أم لا وتجنب قراءات القرص المحلي. |
customMetrics.rocksdbSstFileSize |
حجم جميع ملفات SST. تشير SST إلى جدول فرز ثابت، وهو البنية الجدولية التي يستخدمها RocksDB لتخزين البيانات. |
customMetrics.rocksdbTotalBytesRead |
عدد وحدات البايت غير المضغوطة التي تقرأها get العمليات. |
customMetrics.rocksdbTotalBytesReadByCompaction |
عدد وحدات البايت التي تقرأها عملية الضغط من القرص. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
تتطلب بعض العمليات ذات الحالة (على سبيل المثال، معالجة المهلة في FlatMapGroupsWithState والعلامات المائية) قراءة البيانات في DB من خلال مكرر. يمثل هذا المقياس حجم البيانات غير المضغوطة المقروءة باستخدام المكرر. |
customMetrics.rocksdbTotalBytesWritten |
عدد وحدات البايت غير المضغوطة المكتوبة بواسطة put العمليات. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
عدد وحدات البايت التي تكتبها عملية الضغط على القرص. |
customMetrics.rocksdbTotalCompactionLatencyMs |
ميلي ثانية من الوقت لضغطات RocksDB، بما في ذلك ضغطات الخلفية والضغط الاختياري الذي بدأ أثناء التثبيت. |
customMetrics.rocksdbTotalFlushLatencyMs |
وقت المسح، بما في ذلك مسح الخلفية. عمليات المسح هي العمليات التي يتم من خلالها مسح MemTable إلى التخزين بمجرد امتلاءه. MemTables هي المستوى الأول حيث يتم تخزين البيانات في RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
يدير RocksDB File Manager استخدام مساحة قرص ملف SST الفعلية وحذفها. يمثل هذا المقياس الملفات المضغوطة غير المضغوطة بالبايت كما تم الإبلاغ عنها بواسطة إدارة الملفات. |
كائن المصادر (Kafka)
مقياس | الوصف |
---|---|
sources.description |
اسم المصدر الذي يقرأ منه استعلام البث. على سبيل المثال، “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” |
sources.startOffset العنصر |
بدء رقم الإزاحة داخل موضوع Kafka الذي بدأت مهمة الدفق فيه. |
sources.endOffset العنصر |
أحدث إزاحة تمت معالجتها بواسطة الميكروباتش. يمكن أن يكون هذا مساويا latestOffset لتنفيذ الميكروبات الجارية. |
sources.latestOffset العنصر |
أحدث إزاحة تم رسمها بواسطة الميكروباتش. عندما يكون هناك تقييد، قد لا تعالج عملية الدفعات الصغيرة جميع الإزاحات، مما endOffset يسبب ويختلف latestOffset . |
sources.numInputRows |
عدد صفوف الإدخال التي تمت معالجتها من هذا المصدر. |
sources.inputRowsPerSecond |
معدل وصول البيانات للمعالجة لهذا المصدر. |
sources.processedRowsPerSecond |
معدل معالجة Spark للبيانات لهذا المصدر. |
عنصر sources.metrics (Kafka)
مقياس | الوصف |
---|---|
sources.metrics.avgOffsetsBehindLatest |
متوسط عدد الإزاحات التي يكون الاستعلام المتدفق خلف أحدث إزاحة متاحة بين جميع الموضوعات المشتركة. |
sources.metrics.estimatedTotalBytesBehindLatest |
العدد المقدر لوحدات البايت التي لم تستهلكها عملية الاستعلام من الموضوعات المشتركة. |
sources.metrics.maxOffsetsBehindLatest |
الحد الأقصى لعدد الإزاحات التي يكون الاستعلام المتدفق خلف أحدث إزاحة متاحة بين جميع الموضوعات المشتركة. |
sources.metrics.minOffsetsBehindLatest |
الحد الأدنى لعدد الإزاحات التي يكون الاستعلام المتدفق خلف أحدث إزاحة متاحة بين جميع الموضوعات المشتركة. |
كائن المتلقي (Kafka)
مقياس | الوصف |
---|---|
sink.description |
اسم المتلقي الذي يكتب إليه استعلام البث. على سبيل المثال، “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” |
sink.numOutputRows |
عدد الصفوف التي تمت كتابتها إلى جدول الإخراج أو المتلقي كجزء من الميكروباتش. بالنسبة لبعض الحالات، يمكن أن تكون هذه القيمة "-1" ويمكن تفسيرها بشكل عام على أنها "غير معروفة". |
كائن المصادر (Delta Lake)
مقياس | الوصف |
---|---|
sources.description |
اسم المصدر الذي يقرأ منه استعلام البث. على سبيل المثال، “DeltaSource[table]” |
sources.[startOffset/endOffset].sourceVersion |
إصدار التسلسل الذي يتم ترميز هذه الإزاحة به. |
sources.[startOffset/endOffset].reservoirId |
معرف الجدول الذي تقرأ منه. يتم استخدام هذا للكشف عن التكوين الخاطئ عند إعادة تشغيل استعلام. |
sources.[startOffset/endOffset].reservoirVersion |
إصدار الجدول الذي تقوم بمعالجته حاليا. |
sources.[startOffset/endOffset].index |
الفهرس في تسلسل AddFiles في هذا الإصدار. يتم استخدام هذا لتقسيم التثبيتات الكبيرة إلى دفعات متعددة. يتم إنشاء هذا الفهرس عن طريق الفرز على modificationTimestamp و path . |
sources.[startOffset/endOffset].isStartingVersion |
ما إذا كانت هذه الإزاحة تشير إلى استعلام يبدأ بدلا من معالجة التغييرات. عند بدء استعلام جديد، تتم معالجة جميع البيانات الموجودة في الجدول في البداية، ثم تتم معالجة البيانات الجديدة التي وصلت. |
sources.latestOffset |
أحدث إزاحة تمت معالجتها بواسطة استعلام microbatch. |
sources.numInputRows |
عدد صفوف الإدخال التي تمت معالجتها من هذا المصدر. |
sources.inputRowsPerSecond |
معدل وصول البيانات للمعالجة لهذا المصدر. |
sources.processedRowsPerSecond |
معدل معالجة Spark للبيانات لهذا المصدر. |
sources.metrics.numBytesOutstanding |
حجم الملفات المعلقة (الملفات التي تتبعها RocksDB) مجتمعة. هذا هو مقياس تراكم دلتا والتحميل التلقائي كمصدر الدفق. |
sources.metrics.numFilesOutstanding |
عدد الملفات المعلقة التي ستتم معالجتها. هذا هو مقياس تراكم دلتا والتحميل التلقائي كمصدر الدفق. |
كائن المتلقي (Delta Lake)
مقياس | الوصف |
---|---|
sink.description |
اسم المتلقي الذي يكتب الاستعلام المتدفق إليه. على سبيل المثال، “DeltaSink[table]” |
sink.numOutputRows |
عدد الصفوف في هذا المقياس هو "-1" لأن Spark لا يمكنه استنتاج صفوف الإخراج لأحواض DSv1، وهو تصنيف متلقي Delta Lake. |
الأمثلة
مثال حدث Kafka-to-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
مثال حدث Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
مثال على مصدر السعر لحدث Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}