Memantau kueri Streaming Terstruktur di Azure Databricks
Azure Databricks menyediakan pemantauan bawaan untuk aplikasi Streaming Terstruktur melalui antarmuka pengguna Spark di bawah tab Streaming .
Membedakan kueri Streaming Terstruktur di antarmuka pengguna Spark
Berikan nama kueri unik kepada streaming Anda dengan menambahkan .queryName(<query-name>)
ke kode Anda writeStream
untuk dengan mudah membedakan metrik mana yang termasuk dalam aliran mana di antarmuka pengguna Spark.
Mendorong metrik Streaming Terstruktur ke layanan eksternal
Metrik streaming dapat didorong ke layanan eksternal untuk memperingatkan atau dasbor kasus penggunaan dengan menggunakan antarmuka Pendengar Kueri Streaming Apache Spark. Di Databricks Runtime 11.3 LTS ke atas, Pendengar Kueri Streaming tersedia di Python dan Scala.
Penting
Kredensial dan objek yang dikelola oleh Katalog Unity tidak dapat digunakan dalam StreamingQueryListener
logika.
Catatan
Latensi pemrosesan dengan pendengar dapat secara signifikan memengaruhi kecepatan pemrosesan kueri. Disarankan untuk membatasi logika pemrosesan di listener ini dan memilih menulis ke sistem respons cepat seperti Kafka untuk efisiensi.
Kode berikut menyediakan contoh dasar sintaks untuk mengimplementasikan pendengar:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Menentukan metrik yang dapat diamati dalam Streaming Terstruktur
Metrik yang dapat diamati diberi nama fungsi agregat arbitrer yang dapat ditentukan pada kueri (DataFrame). Segera setelah eksekusi DataFrame mencapai titik penyelesaian (yaitu, menyelesaikan kueri batch atau mencapai periode streaming), peristiwa bernama dipancarkan yang berisi metrik untuk data yang diproses sejak titik penyelesaian terakhir.
Anda dapat mengamati metrik ini dengan melampirkan pendengar ke sesi Spark. Pendengar tergantung pada mode eksekusi:
Mode batch: Gunakan
QueryExecutionListener
.QueryExecutionListener
dipanggil saat kueri selesai. Akses metrik menggunakan petaQueryExecution.observedMetrics
.Streaming, atau microbatch: Gunakan
StreamingQueryListener
.StreamingQueryListener
dipanggil ketika kueri streaming menyelesaikan epoch. Akses metrik menggunakan petaStreamingQueryProgress.observedMetrics
. Azure Databricks tidak mendukung streaming eksekusi berkelanjutan.
Contohnya:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Metrik objek StreamingQueryListener
Metrik | Deskripsi |
---|---|
id |
ID kueri unik yang bertahan di seluruh hidupkan ulang. |
runId |
Id kueri yang unik untuk setiap mulai/mulai ulang. Lihat StreamingQuery.runId(). |
name |
Nama kueri yang ditentukan pengguna. Nama null jika tidak ada nama yang ditentukan. |
timestamp |
Tanda waktu untuk eksekusi microbatch. |
batchId |
ID unik untuk batch data saat ini yang sedang diproses. Dalam kasus percobaan ulang setelah kegagalan, ID batch tertentu dapat dijalankan lebih dari sekali. Demikian pula, ketika tidak ada data yang akan diproses, ID batch tidak ditambahkan. |
numInputRows |
Jumlah rekaman agregat (di semua sumber) yang diproses dalam pemicu. |
inputRowsPerSecond |
Tingkat agregat (di semua sumber) data yang tiba. |
processedRowsPerSecond |
Laju agregat (di semua sumber) tempat Spark memproses data. |
objek durationMs
Informasi tentang waktu yang diperlukan untuk menyelesaikan berbagai tahap proses eksekusi microbatch.
Metrik | Deskripsi |
---|---|
durationMs.addBatch |
Waktu yang dibutuhkan untuk mengeksekusi microbatch. Ini tidak termasuk waktu yang dibutuhkan Spark untuk merencanakan microbatch. |
durationMs.getBatch |
Waktu yang diperlukan untuk mengambil metadata tentang offset dari sumber. |
durationMs.latestOffset |
Offset terbaru yang dikonsumsi untuk microbatch. Objek kemajuan ini mengacu pada waktu yang diperlukan untuk mengambil offset terbaru dari sumber. |
durationMs.queryPlanning |
Waktu yang diperlukan untuk menghasilkan rencana eksekusi. |
durationMs.triggerExecution |
Waktu yang diperlukan untuk merencanakan dan menjalankan microbatch. |
durationMs.walCommit |
Waktu yang diperlukan untuk menerapkan offset baru yang tersedia. |
objek eventTime
Informasi tentang nilai waktu peristiwa yang terlihat dalam data yang sedang diproses di microbatch. Data ini digunakan oleh marka air untuk mencari tahu cara memangkas status untuk memproses agregasi stateful yang ditentukan dalam pekerjaan Streaming Terstruktur.
Metrik | Deskripsi |
---|---|
eventTime.avg |
Waktu peristiwa rata-rata yang terlihat dalam pemicu tersebut. |
eventTime.max |
Waktu peristiwa maksimum yang terlihat dalam pemicu tersebut. |
eventTime.min |
Waktu peristiwa minimum yang terlihat dalam pemicu tersebut. |
eventTime.watermark |
Nilai marka air yang digunakan dalam pemicu tersebut. |
objek stateOperators
Informasi tentang operasi stateful yang ditentukan dalam pekerjaan Streaming Terstruktur dan agregasi yang dihasilkan dari operasi tersebut.
Metrik | Deskripsi |
---|---|
stateOperators.operatorName |
Nama operator stateful yang terkait dengan metrik, seperti symmetricHashJoin , , dedupe stateStoreSave . |
stateOperators.numRowsTotal |
Jumlah total baris dalam status sebagai hasil dari operator stateful atau agregasi. |
stateOperators.numRowsUpdated |
Jumlah total baris yang diperbarui dalam status sebagai hasil dari operator atau agregasi stateful. |
stateOperators.allUpdatesTimeMs |
Metrik ini saat ini tidak dapat diukur oleh Spark dan direncanakan akan dihapus dalam pembaruan mendatang. |
stateOperators.numRowsRemoved |
Jumlah total baris yang dihapus dari status sebagai akibat dari operator atau agregasi stateful. |
stateOperators.allRemovalsTimeMs |
Metrik ini saat ini tidak dapat diukur oleh Spark dan direncanakan akan dihapus dalam pembaruan mendatang. |
stateOperators.commitTimeMs |
Waktu yang diperlukan untuk menerapkan semua pembaruan (menempatkan dan menghapus) dan mengembalikan versi baru. |
stateOperators.memoryUsedBytes |
Memori yang digunakan oleh penyimpanan status. |
stateOperators.numRowsDroppedByWatermark |
Jumlah baris yang dianggap terlambat untuk disertakan dalam agregasi stateful. Agregasi streaming saja: Jumlah baris yang dihilangkan pasca-agregasi (bukan baris input mentah). Jumlah ini tidak tepat, tetapi memberikan indikasi bahwa ada data terlambat yang dihilangkan. |
stateOperators.numShufflePartitions |
Jumlah partisi acak untuk operator stateful ini. |
stateOperators.numStateStoreInstances |
Instans penyimpanan status aktual yang telah diinisialisasi dan dikelola operator. Untuk banyak operator stateful, ini sama dengan jumlah partisi. Namun, gabungan stream-stream menginisialisasi empat instans penyimpanan status per partisi. |
objek stateOperators.customMetrics
Informasi yang dikumpulkan dari RocksDB menangkap metrik tentang performa dan operasinya sehubungan dengan nilai stateful yang dikelolanya untuk pekerjaan Streaming Terstruktur. Untuk informasi selengkapnya, lihat Mengonfigurasi penyimpanan status RocksDB di Azure Databricks.
Metrik | Deskripsi |
---|---|
customMetrics.rocksdbBytesCopied |
Jumlah byte yang disalin seperti yang dilacak oleh RocksDB File Manager. |
customMetrics.rocksdbCommitCheckpointLatency |
Waktu dalam milidetik mengambil rekam jepret RocksDB asli dan menulisnya ke direktori lokal. |
customMetrics.rocksdbCompactLatency |
Waktu dalam milidetik pemadatan (opsional) selama penerapan titik pemeriksaan. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
Waktu dalam milidetik menyinkronkan rekam jepret RocksDB asli ke penyimpanan eksternal (lokasi titik pemeriksaan). |
customMetrics.rocksdbCommitFlushLatency |
Waktu dalam milidetik menyiram perubahan dalam memori RocksDB ke disk lokal. |
customMetrics.rocksdbCommitPauseLatency |
Waktu dalam milidetik menghentikan utas pekerja latar belakang sebagai bagian dari penerapan titik pemeriksaan, seperti untuk pemadatan. |
customMetrics.rocksdbCommitWriteBatchLatency |
Waktu dalam milidetik menerapkan penulisan bertahap dalam struktur dalam memori (WriteBatch ) ke RocksDB asli. |
customMetrics.rocksdbFilesCopied |
Jumlah file yang disalin sebagai dilacak oleh RocksDB File Manager. |
customMetrics.rocksdbFilesReused |
Jumlah file yang digunakan kembali seperti yang dilacak oleh RocksDB File Manager. |
customMetrics.rocksdbGetCount |
Jumlah get panggilan ke DB (tidak termasuk gets dari WriteBatch - batch dalam memori yang digunakan untuk penulisan penahapan). |
customMetrics.rocksdbGetLatency |
Waktu rata-rata dalam nanodetik untuk panggilan asli RocksDB::Get yang mendasar. |
customMetrics.rocksdbReadBlockCacheHitCount |
Jumlah hit cache dari cache blok di RocksDB yang berguna dalam menghindari pembacaan disk lokal. |
customMetrics.rocksdbReadBlockCacheMissCount |
Jumlah cache blok di RocksDB tidak berguna dalam menghindari pembacaan disk lokal. |
customMetrics.rocksdbSstFileSize |
Ukuran semua file Static Sorted Table (SST) - struktur tabular yang digunakan RocksDB untuk menyimpan data. |
customMetrics.rocksdbTotalBytesRead |
Jumlah byte yang tidak dikompresi yang dibaca oleh get operasi. |
customMetrics.rocksdbTotalBytesReadByCompaction |
Jumlah byte yang dibaca proses pemadatan dari disk. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
Jumlah total byte dari data yang tidak dikompresi yang dibaca menggunakan iterator. Beberapa operasi stateful (misalnya, pemrosesan batas waktu masuk FlatMapGroupsWithState dan watermarking) memerlukan membaca data di DB melalui iterator. |
customMetrics.rocksdbTotalBytesWritten |
Jumlah total byte yang tidak dikompresi yang ditulis oleh put operasi. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
Jumlah total byte yang ditulis proses pemadatan ke disk. |
customMetrics.rocksdbTotalCompactionLatencyMs |
Waktu dalam milidetik untuk pemadatan RocksDB, termasuk pemadatan latar belakang dan pemadatan opsional yang dimulai selama penerapan. |
customMetrics.rocksdbTotalFlushLatencyMs |
Total waktu flush, termasuk flushing latar belakang. Operasi flush adalah proses yang MemTable dibersihkan ke penyimpanan setelah penuh. MemTables adalah tingkat pertama tempat data disimpan di RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Ukuran dalam byte file zip yang tidak dikompresi seperti yang dilaporkan oleh File Manager. File Manager mengelola pemanfaatan dan penghapusan ruang disk file SST fisik. |
objek sumber (Kafka)
Metrik | Deskripsi |
---|---|
sources.description |
Deskripsi terperinci tentang sumber Kafka, menentukan topik Kafka yang tepat yang dibaca. Misalnya: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” . |
Objek sources.startOffset |
Nomor offset awal dalam topik Kafka tempat pekerjaan streaming dimulai. |
Objek sources.endOffset |
Offset terakhir yang diproses oleh microbatch. Ini bisa sama dengan latestOffset untuk eksekusi mikrobatch yang sedang berlangsung. |
Objek sources.latestOffset |
Offset terbaru yang dipikirkan oleh microbatch. Proses microbatching mungkin tidak memproses semua offset ketika ada pembatasan, yang mengakibatkan endOffset dan latestOffset perbedaan. |
sources.numInputRows |
Jumlah baris input yang diproses dari sumber ini. |
sources.inputRowsPerSecond |
Tingkat di mana data tiba untuk diproses dari sumber ini. |
sources.processedRowsPerSecond |
Tingkat di mana Spark memproses data dari sumber ini. |
objek sources.metrics (Kafka)
Metrik | Deskripsi |
---|---|
sources.metrics.avgOffsetsBehindLatest |
Jumlah rata-rata offset yang kueri streaming berada di belakang offset terbaru yang tersedia di antara semua topik berlangganan. |
sources.metrics.estimatedTotalBytesBehindLatest |
Perkiraan jumlah byte yang belum digunakan proses kueri dari topik berlangganan. |
sources.metrics.maxOffsetsBehindLatest |
Jumlah maksimum offset yang kueri streaming berada di belakang offset terbaru yang tersedia di antara semua topik berlangganan. |
sources.metrics.minOffsetsBehindLatest |
Jumlah minimum offset yang kueri streaming berada di belakang offset terbaru yang tersedia di antara semua topik berlangganan. |
objek sink (Kafka)
Metrik | Deskripsi |
---|---|
sink.description |
Deskripsi sink Kafka tempat kueri streaming menulis, merinci implementasi sink Kafka tertentu yang digunakan. Misalnya: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” . |
sink.numOutputRows |
Jumlah baris yang ditulis ke tabel output atau sink sebagai bagian dari microbatch. Untuk beberapa situasi, nilai ini dapat berupa "-1" dan umumnya dapat ditafsirkan sebagai "tidak diketahui". |
objek sumber (Delta Lake)
Metrik | Deskripsi |
---|---|
sources.description |
Deskripsi sumber dari mana kueri streaming dibaca. Misalnya: “DeltaSource[table]” . |
sources.[startOffset/endOffset].sourceVersion |
Versi serialisasi di mana offset ini dikodekan. |
sources.[startOffset/endOffset].reservoirId |
ID tabel yang sedang dibaca. Ini digunakan untuk mendeteksi kesalahan konfigurasi saat memulai ulang kueri. |
sources.[startOffset/endOffset].reservoirVersion |
Versi tabel yang sedang diproses. |
sources.[startOffset/endOffset].index |
Indeks dalam urutan AddFiles dalam versi ini. Ini digunakan untuk memecah penerapan besar menjadi beberapa batch. Indeks ini dibuat dengan mengurutkan pada modificationTimestamp dan path . |
sources.[startOffset/endOffset].isStartingVersion |
Mengidentifikasi apakah offset saat ini menandai dimulainya kueri streaming baru daripada pemrosesan perubahan yang terjadi setelah data awal diproses. Saat memulai kueri baru, semua data yang ada dalam tabel di awal diproses terlebih dahulu, lalu data baru apa pun yang tiba. |
sources.latestOffset |
Offset terbaru yang diproses oleh kueri microbatch. |
sources.numInputRows |
Jumlah baris input yang diproses dari sumber ini. |
sources.inputRowsPerSecond |
Tingkat di mana data tiba untuk diproses dari sumber ini. |
sources.processedRowsPerSecond |
Tingkat di mana Spark memproses data dari sumber ini. |
sources.metrics.numBytesOutstanding |
Ukuran gabungan file yang luar biasa (file yang dilacak oleh RocksDB). Ini adalah metrik backlog untuk Delta dan Auto Loader sebagai sumber streaming. |
sources.metrics.numFilesOutstanding |
Jumlah file terutang yang akan diproses. Ini adalah metrik backlog untuk Delta dan Auto Loader sebagai sumber streaming. |
objek sink (Delta Lake)
Metrik | Deskripsi |
---|---|
sink.description |
Deskripsi sink Delta, merinci implementasi sink Delta tertentu yang digunakan. Misalnya: “DeltaSink[table]” . |
sink.numOutputRows |
Jumlah baris selalu "-1" karena Spark tidak dapat menyimpulkan baris output untuk sink DSv1, yang merupakan klasifikasi untuk sink Delta Lake. |
Contoh
Contoh peristiwa Kafka-ke-Kafka StreamingQueryListener
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Contoh peristiwa Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Contoh peristiwa Kinesis-ke-Delta Lake StreamingQueryListener
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Contoh peristiwa Kafka+Delta Lake-to-Delta Lake StreamingQueryListener
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Contoh sumber laju ke peristiwa Delta Lake StreamingQueryListener
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}