Bagikan melalui


Memantau kueri Streaming Terstruktur di Azure Databricks

Azure Databricks menyediakan pemantauan bawaan untuk aplikasi Streaming Terstruktur melalui antarmuka pengguna Spark di bawah tab Streaming .

Membedakan kueri Streaming Terstruktur di antarmuka pengguna Spark

Berikan nama kueri unik kepada streaming Anda dengan menambahkan .queryName(<query-name>) ke kode Anda writeStream untuk dengan mudah membedakan metrik mana yang termasuk dalam aliran mana di antarmuka pengguna Spark.

Mendorong metrik Streaming Terstruktur ke layanan eksternal

Metrik streaming dapat didorong ke layanan eksternal untuk memperingatkan atau dasbor kasus penggunaan dengan menggunakan antarmuka Pendengar Kueri Streaming Apache Spark. Di Databricks Runtime 11.3 LTS ke atas, Pendengar Kueri Streaming tersedia di Python dan Scala.

Penting

Kredensial dan objek yang dikelola oleh Katalog Unity tidak dapat digunakan dalam StreamingQueryListener logika.

Catatan

Latensi pemrosesan dengan pendengar dapat secara signifikan memengaruhi kecepatan pemrosesan kueri. Disarankan untuk membatasi logika pemrosesan di listener ini dan memilih menulis ke sistem respons cepat seperti Kafka untuk efisiensi.

Kode berikut menyediakan contoh dasar sintaks untuk mengimplementasikan pendengar:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Menentukan metrik yang dapat diamati dalam Streaming Terstruktur

Metrik yang dapat diamati diberi nama fungsi agregat arbitrer yang dapat ditentukan pada kueri (DataFrame). Segera setelah eksekusi DataFrame mencapai titik penyelesaian (yaitu, menyelesaikan kueri batch atau mencapai periode streaming), peristiwa bernama dipancarkan yang berisi metrik untuk data yang diproses sejak titik penyelesaian terakhir.

Anda dapat mengamati metrik ini dengan melampirkan pendengar ke sesi Spark. Pendengar tergantung pada mode eksekusi:

  • Mode batch: Gunakan QueryExecutionListener.

    QueryExecutionListener dipanggil saat kueri selesai. Akses metrik menggunakan peta QueryExecution.observedMetrics.

  • Streaming, atau microbatch: Gunakan StreamingQueryListener.

    StreamingQueryListener dipanggil ketika kueri streaming menyelesaikan epoch. Akses metrik menggunakan peta StreamingQueryProgress.observedMetrics. Azure Databricks tidak mendukung streaming eksekusi berkelanjutan.

Contohnya:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Metrik objek StreamingQueryListener

Metrik Deskripsi
id ID kueri unik yang bertahan di seluruh hidupkan ulang. Lihat StreamingQuery.id().
runId Id kueri yang unik untuk setiap mulai/mulai ulang. Lihat StreamingQuery.runId().
name Nama kueri yang ditentukan pengguna. Nama null jika tidak ada nama yang ditentukan.
timestamp Tanda waktu untuk eksekusi microbatch.
batchId ID unik untuk batch data saat ini yang sedang diproses. Dalam kasus percobaan ulang setelah kegagalan, ID batch tertentu dapat dijalankan lebih dari sekali. Demikian pula, ketika tidak ada data yang akan diproses, ID batch tidak ditambahkan.
numInputRows Jumlah rekaman agregat (di semua sumber) yang diproses dalam pemicu.
inputRowsPerSecond Tingkat agregat (di semua sumber) data yang tiba.
processedRowsPerSecond Laju agregat (di semua sumber) tempat Spark memproses data.

objek durationMs

Informasi tentang waktu yang diperlukan untuk menyelesaikan berbagai tahap proses eksekusi microbatch.

Metrik Deskripsi
durationMs.addBatch Waktu yang dibutuhkan untuk mengeksekusi microbatch. Ini tidak termasuk waktu yang dibutuhkan Spark untuk merencanakan microbatch.
durationMs.getBatch Waktu yang diperlukan untuk mengambil metadata tentang offset dari sumber.
durationMs.latestOffset Offset terbaru yang dikonsumsi untuk microbatch. Objek kemajuan ini mengacu pada waktu yang diperlukan untuk mengambil offset terbaru dari sumber.
durationMs.queryPlanning Waktu yang diperlukan untuk menghasilkan rencana eksekusi.
durationMs.triggerExecution Waktu yang diperlukan untuk merencanakan dan menjalankan microbatch.
durationMs.walCommit Waktu yang diperlukan untuk menerapkan offset baru yang tersedia.

objek eventTime

Informasi tentang nilai waktu peristiwa yang terlihat dalam data yang sedang diproses di microbatch. Data ini digunakan oleh marka air untuk mencari tahu cara memangkas status untuk memproses agregasi stateful yang ditentukan dalam pekerjaan Streaming Terstruktur.

Metrik Deskripsi
eventTime.avg Waktu peristiwa rata-rata yang terlihat dalam pemicu tersebut.
eventTime.max Waktu peristiwa maksimum yang terlihat dalam pemicu tersebut.
eventTime.min Waktu peristiwa minimum yang terlihat dalam pemicu tersebut.
eventTime.watermark Nilai marka air yang digunakan dalam pemicu tersebut.

objek stateOperators

Informasi tentang operasi stateful yang ditentukan dalam pekerjaan Streaming Terstruktur dan agregasi yang dihasilkan dari operasi tersebut.

Metrik Deskripsi
stateOperators.operatorName Nama operator stateful yang terkait dengan metrik, seperti symmetricHashJoin, , dedupestateStoreSave.
stateOperators.numRowsTotal Jumlah total baris dalam status sebagai hasil dari operator stateful atau agregasi.
stateOperators.numRowsUpdated Jumlah total baris yang diperbarui dalam status sebagai hasil dari operator atau agregasi stateful.
stateOperators.allUpdatesTimeMs Metrik ini saat ini tidak dapat diukur oleh Spark dan direncanakan akan dihapus dalam pembaruan mendatang.
stateOperators.numRowsRemoved Jumlah total baris yang dihapus dari status sebagai akibat dari operator atau agregasi stateful.
stateOperators.allRemovalsTimeMs Metrik ini saat ini tidak dapat diukur oleh Spark dan direncanakan akan dihapus dalam pembaruan mendatang.
stateOperators.commitTimeMs Waktu yang diperlukan untuk menerapkan semua pembaruan (menempatkan dan menghapus) dan mengembalikan versi baru.
stateOperators.memoryUsedBytes Memori yang digunakan oleh penyimpanan status.
stateOperators.numRowsDroppedByWatermark Jumlah baris yang dianggap terlambat untuk disertakan dalam agregasi stateful. Agregasi streaming saja: Jumlah baris yang dihilangkan pasca-agregasi (bukan baris input mentah). Jumlah ini tidak tepat, tetapi memberikan indikasi bahwa ada data terlambat yang dihilangkan.
stateOperators.numShufflePartitions Jumlah partisi acak untuk operator stateful ini.
stateOperators.numStateStoreInstances Instans penyimpanan status aktual yang telah diinisialisasi dan dikelola operator. Untuk banyak operator stateful, ini sama dengan jumlah partisi. Namun, gabungan stream-stream menginisialisasi empat instans penyimpanan status per partisi.

objek stateOperators.customMetrics

Informasi yang dikumpulkan dari RocksDB menangkap metrik tentang performa dan operasinya sehubungan dengan nilai stateful yang dikelolanya untuk pekerjaan Streaming Terstruktur. Untuk informasi selengkapnya, lihat Mengonfigurasi penyimpanan status RocksDB di Azure Databricks.

Metrik Deskripsi
customMetrics.rocksdbBytesCopied Jumlah byte yang disalin seperti yang dilacak oleh RocksDB File Manager.
customMetrics.rocksdbCommitCheckpointLatency Waktu dalam milidetik mengambil rekam jepret RocksDB asli dan menulisnya ke direktori lokal.
customMetrics.rocksdbCompactLatency Waktu dalam milidetik pemadatan (opsional) selama penerapan titik pemeriksaan.
customMetrics.rocksdbCommitFileSyncLatencyMs Waktu dalam milidetik menyinkronkan rekam jepret RocksDB asli ke penyimpanan eksternal (lokasi titik pemeriksaan).
customMetrics.rocksdbCommitFlushLatency Waktu dalam milidetik menyiram perubahan dalam memori RocksDB ke disk lokal.
customMetrics.rocksdbCommitPauseLatency Waktu dalam milidetik menghentikan utas pekerja latar belakang sebagai bagian dari penerapan titik pemeriksaan, seperti untuk pemadatan.
customMetrics.rocksdbCommitWriteBatchLatency Waktu dalam milidetik menerapkan penulisan bertahap dalam struktur dalam memori (WriteBatch) ke RocksDB asli.
customMetrics.rocksdbFilesCopied Jumlah file yang disalin sebagai dilacak oleh RocksDB File Manager.
customMetrics.rocksdbFilesReused Jumlah file yang digunakan kembali seperti yang dilacak oleh RocksDB File Manager.
customMetrics.rocksdbGetCount Jumlah get panggilan ke DB (tidak termasuk gets dari WriteBatch - batch dalam memori yang digunakan untuk penulisan penahapan).
customMetrics.rocksdbGetLatency Waktu rata-rata dalam nanodetik untuk panggilan asli RocksDB::Get yang mendasar.
customMetrics.rocksdbReadBlockCacheHitCount Jumlah hit cache dari cache blok di RocksDB yang berguna dalam menghindari pembacaan disk lokal.
customMetrics.rocksdbReadBlockCacheMissCount Jumlah cache blok di RocksDB tidak berguna dalam menghindari pembacaan disk lokal.
customMetrics.rocksdbSstFileSize Ukuran semua file Static Sorted Table (SST) - struktur tabular yang digunakan RocksDB untuk menyimpan data.
customMetrics.rocksdbTotalBytesRead Jumlah byte yang tidak dikompresi yang dibaca oleh get operasi.
customMetrics.rocksdbTotalBytesReadByCompaction Jumlah byte yang dibaca proses pemadatan dari disk.
customMetrics.rocksdbTotalBytesReadThroughIterator Jumlah total byte dari data yang tidak dikompresi yang dibaca menggunakan iterator. Beberapa operasi stateful (misalnya, pemrosesan batas waktu masuk FlatMapGroupsWithState dan watermarking) memerlukan membaca data di DB melalui iterator.
customMetrics.rocksdbTotalBytesWritten Jumlah total byte yang tidak dikompresi yang ditulis oleh put operasi.
customMetrics.rocksdbTotalBytesWrittenByCompaction Jumlah total byte yang ditulis proses pemadatan ke disk.
customMetrics.rocksdbTotalCompactionLatencyMs Waktu dalam milidetik untuk pemadatan RocksDB, termasuk pemadatan latar belakang dan pemadatan opsional yang dimulai selama penerapan.
customMetrics.rocksdbTotalFlushLatencyMs Total waktu flush, termasuk flushing latar belakang. Operasi flush adalah proses yang MemTable dibersihkan ke penyimpanan setelah penuh. MemTables adalah tingkat pertama tempat data disimpan di RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Ukuran dalam byte file zip yang tidak dikompresi seperti yang dilaporkan oleh File Manager. File Manager mengelola pemanfaatan dan penghapusan ruang disk file SST fisik.

objek sumber (Kafka)

Metrik Deskripsi
sources.description Deskripsi terperinci tentang sumber Kafka, menentukan topik Kafka yang tepat yang dibaca. Misalnya: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
Objek sources.startOffset Nomor offset awal dalam topik Kafka tempat pekerjaan streaming dimulai.
Objek sources.endOffset Offset terakhir yang diproses oleh microbatch. Ini bisa sama dengan latestOffset untuk eksekusi mikrobatch yang sedang berlangsung.
Objek sources.latestOffset Offset terbaru yang dipikirkan oleh microbatch. Proses microbatching mungkin tidak memproses semua offset ketika ada pembatasan, yang mengakibatkan endOffset dan latestOffset perbedaan.
sources.numInputRows Jumlah baris input yang diproses dari sumber ini.
sources.inputRowsPerSecond Tingkat di mana data tiba untuk diproses dari sumber ini.
sources.processedRowsPerSecond Tingkat di mana Spark memproses data dari sumber ini.

objek sources.metrics (Kafka)

Metrik Deskripsi
sources.metrics.avgOffsetsBehindLatest Jumlah rata-rata offset yang kueri streaming berada di belakang offset terbaru yang tersedia di antara semua topik berlangganan.
sources.metrics.estimatedTotalBytesBehindLatest Perkiraan jumlah byte yang belum digunakan proses kueri dari topik berlangganan.
sources.metrics.maxOffsetsBehindLatest Jumlah maksimum offset yang kueri streaming berada di belakang offset terbaru yang tersedia di antara semua topik berlangganan.
sources.metrics.minOffsetsBehindLatest Jumlah minimum offset yang kueri streaming berada di belakang offset terbaru yang tersedia di antara semua topik berlangganan.

objek sink (Kafka)

Metrik Deskripsi
sink.description Deskripsi sink Kafka tempat kueri streaming menulis, merinci implementasi sink Kafka tertentu yang digunakan. Misalnya: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows Jumlah baris yang ditulis ke tabel output atau sink sebagai bagian dari microbatch. Untuk beberapa situasi, nilai ini dapat berupa "-1" dan umumnya dapat ditafsirkan sebagai "tidak diketahui".

objek sumber (Delta Lake)

Metrik Deskripsi
sources.description Deskripsi sumber dari mana kueri streaming dibaca. Misalnya: “DeltaSource[table]”.
sources.[startOffset/endOffset].sourceVersion Versi serialisasi di mana offset ini dikodekan.
sources.[startOffset/endOffset].reservoirId ID tabel yang sedang dibaca. Ini digunakan untuk mendeteksi kesalahan konfigurasi saat memulai ulang kueri.
sources.[startOffset/endOffset].reservoirVersion Versi tabel yang sedang diproses.
sources.[startOffset/endOffset].index Indeks dalam urutan AddFiles dalam versi ini. Ini digunakan untuk memecah penerapan besar menjadi beberapa batch. Indeks ini dibuat dengan mengurutkan pada modificationTimestamp dan path.
sources.[startOffset/endOffset].isStartingVersion Mengidentifikasi apakah offset saat ini menandai dimulainya kueri streaming baru daripada pemrosesan perubahan yang terjadi setelah data awal diproses. Saat memulai kueri baru, semua data yang ada dalam tabel di awal diproses terlebih dahulu, lalu data baru apa pun yang tiba.
sources.latestOffset Offset terbaru yang diproses oleh kueri microbatch.
sources.numInputRows Jumlah baris input yang diproses dari sumber ini.
sources.inputRowsPerSecond Tingkat di mana data tiba untuk diproses dari sumber ini.
sources.processedRowsPerSecond Tingkat di mana Spark memproses data dari sumber ini.
sources.metrics.numBytesOutstanding Ukuran gabungan file yang luar biasa (file yang dilacak oleh RocksDB). Ini adalah metrik backlog untuk Delta dan Auto Loader sebagai sumber streaming.
sources.metrics.numFilesOutstanding Jumlah file terutang yang akan diproses. Ini adalah metrik backlog untuk Delta dan Auto Loader sebagai sumber streaming.

objek sink (Delta Lake)

Metrik Deskripsi
sink.description Deskripsi sink Delta, merinci implementasi sink Delta tertentu yang digunakan. Misalnya: “DeltaSink[table]”.
sink.numOutputRows Jumlah baris selalu "-1" karena Spark tidak dapat menyimpulkan baris output untuk sink DSv1, yang merupakan klasifikasi untuk sink Delta Lake.

Contoh

Contoh peristiwa Kafka-ke-Kafka StreamingQueryListener

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Contoh peristiwa Delta Lake-to-Delta Lake StreamingQueryListener

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Contoh peristiwa Kinesis-ke-Delta Lake StreamingQueryListener

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Contoh peristiwa Kafka+Delta Lake-to-Delta Lake StreamingQueryListener

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Contoh sumber laju ke peristiwa Delta Lake StreamingQueryListener

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}