Share via


Azure Databricks で構造化ストリーミング クエリを監視する

Azure Databricks では、[ストリーミング] タブの Spark UI で、構造化ストリーミング アプリケーションの監視が組み込みで提供されています。

Spark UI で構造化ストリーミング クエリを区別する

writeStream コードに .queryName(<query-name>) を追加し、ストリームに一意のクエリ名を指定すると、Spark UI でどのメトリックがどのストリームのものであるかを簡単に区別できます。

構造化ストリーミング メトリックを外部サービスにプッシュする

Apache Spark のストリーミング クエリ リスナー インターフェイスを使用して、アラートまたはダッシュボードのユース ケース用に外部サービスにストリーミング メトリックをプッシュできます。 Databricks Runtime 11.3 LTS 以上では、Python と Scala でストリーミング クエリ リスナーを使用できます。

重要

Unity Catalog によって管理される資格情報とオブジェクトは、StreamingQueryListener ロジックでは使用できません。

Note

リスナーに関連する処理待機時間は、クエリ処理に悪影響を及ぼす可能性があります。 Databricks では、これらのリスナーの処理ロジックを最小限に抑え、Kafka などの待機時間の短いシンクに書き込むことをお勧めします。

リスナーを実装するための構文の基本的な例を次のコードに示します。

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

構造化ストリーミングでの監視可能なメトリックを定義する

監視可能なメトリックは、クエリ (DataFrame) で定義できる任意の名前付き集計関数です。 DataFrame の実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリックを含む名前付きイベントが生成されます。

Spark セッションにリスナーをアタッチすると、これらのメトリックを監視できます。 リスナーは実行モードによって異なります。

  • バッチ モード: QueryExecutionListener を使用します。

    QueryExecutionListener は、クエリの完了時に呼び出されます。 QueryExecution.observedMetrics マップを使用してメトリックにアクセスします。

  • ストリーミング、またはマイクロバッチ: StreamingQueryListener を使用します。

    StreamingQueryListener は、ストリーミング クエリがエポックを完了したときに呼び出されます。 StreamingQueryProgress.observedMetrics マップを使用してメトリックにアクセスします。 Azure Databricks では、継続的な実行ストリーミングはサポートされていません。

次に例を示します。

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

StreamingQueryListener オブジェクトのメトリック

メトリック 説明
id 再起動が行われても保持される一意のクエリ ID。 StreamingQuery.id() に関する説明をご覧ください。
runId 起動または再起動ごとの一意のクエリ ID。 StreamingQuery.runId() に関する説明をご覧ください。
name ユーザーが指定したクエリの名前。 指定していない場合は Null。
timestamp マイクロバッチの実行のタイムスタンプ。
batchId 処理されているデータの現在のバッチの一意の ID。 失敗後の再試行の場合、特定のバッチ ID を複数回実行できることに注意してください。 同様に、処理するデータがない場合、バッチ ID はインクリメントされません。
numInputRows トリガーで処理されたレコードの数を (すべてのソースについて) 集計します。
inputRowsPerSecond データの到着速度を (すべてのソースについて) 集計します。
processedRowsPerSecond Spark によるデータの処理速度を (すべてのソースについて) 集計します。

durationMs オブジェクト

マイクロバッチ実行プロセスのさまざまなステージが完了するまでにかかる時間に関する情報。

メトリック 説明
durationMs.addBatch マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画に要した時間は含まれません。
durationMs.getBatch オフセットに関するメタデータをソースから取得するのにかかる時間。
durationMs.latestOffset マイクロバッチに使われた最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を示します。
durationMs.queryPlanning 実行プランの生成にかかった時間。
durationMs.triggerExecution マイクロバッチの計画と実行にかかった時間。
durationMs.walCommit 使用可能な新しいオフセットをコミットするのにかかった時間。

eventTime オブジェクト

マイクロバッチで処理されているデータ内で示されたイベント時間の値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されているステートフル集計を処理するための状態のトリミング方法を把握するため、ウォーターマークによって使用されます。

メトリック 説明
eventTime.avg トリガーで示された平均イベント時間。
eventTime.max トリガーで示された最大イベント時間。
eventTime.min トリガーで示された最小イベント時間。
eventTime.watermark トリガーで使われたウォーターマークの値。

stateOperators オブジェクト

構造化ストリーミング ジョブで定義されているステートフル操作と、そこから生成された集計に関する情報。

メトリック 説明
stateOperators.operatorName メトリックと関係のあるステートフル演算子の名前。 たとえば、symmetricHashJoindedupestateStoreSave などです。
stateOperators.numRowsTotal ステートフル演算子または集計の結果としての状態の行数。
stateOperators.numRowsUpdated ステートフル演算子または集計の結果として状態で更新された行数。
stateOperators.numRowsRemoved ステートフル演算子または集計の結果として状態から削除された行数。
stateOperators.commitTimeMs すべての更新 (書き込みと削除) をコミットし、新しいバージョンを返すのにかかった時間。
stateOperators.memoryUsedBytes 状態ストアによって使われたメモリ。
stateOperators.numRowsDroppedByWatermark ステートフル集計に含めるには遅すぎると見なされた行の数。 ストリーミング集計のみ: 生の入力行ではなく、集計後に削除された行の数。 値は正確ではありませんが、遅いデータが削除されていることを示している可能性があります。
stateOperators.numShufflePartitions このステートフル演算子のシャッフル パーティションの数。
stateOperators.numStateStoreInstances 演算子が初期化および保守した実際の状態ストア インスタンス。 多くのステートフル演算子では、これはパーティションの数と同じですが、ストリーム同士の結合ではパーティションごとに 4 つの状態ストア インスタンスが初期化されます。

stateOperators.customMetrics オブジェクト

RocksDB から収集された、構造化ストリーミング ジョブについてそれが保持するステートフル値に関するそのパフォーマンスと操作についてのメトリックをキャプチャする情報。 詳細については、「Azure Databricks で RocksDB 状態ストアを構成する」を参照してください。

メトリック 説明
customMetrics.rocksdbBytesCopied RocksDB ファイル マネージャーによって追跡された、コピーされたバイト数。
customMetrics.rocksdbCommitCheckpointLatency ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込むのにかかった時間 (ミリ秒単位)。
customMetrics.rocksdbCompactLatency チェックポイント コミットの間の圧縮 (省略可能) に要した時間 (ミリ秒単位)。
customMetrics.rocksdbCommitFileSyncLatencyMs ネイティブ RocksDB スナップショット関連のファイルを外部ストレージ (チェックポイントの場所) に同期するのにかかった時間 (ミリ秒単位)。
customMetrics.rocksdbCommitFlushLatency RocksDB のメモリ内の変更をローカル ディスクにフラッシュするのにかかった時間 (ミリ秒単位)。
customMetrics.rocksdbCommitPauseLatency チェックポイント コミットの一部としてバックグラウンド ワーカー スレッド (圧縮など) の停止にかかった時間 (ミリ秒単位)。
customMetrics.rocksdbCommitWriteBatchLatency メモリ内構造 (WriteBatch) でのステージされた書き込みをネイティブ RocksDB に適用するのにかかった時間 (ミリ秒単位)。
customMetrics.rocksdbFilesCopied RocksDB ファイル マネージャーによって追跡された、コピーされたファイルの数。
customMetrics.rocksdbFilesReused RocksDB ファイル マネージャーによって追跡された、再利用されたファイルの数。
customMetrics.rocksdbGetCount DB に対する get の呼び出しの数 (これには、ステージング書き込みに使われるメモリ内バッチである WriteBatch からの gets は含まれません)。
customMetrics.rocksdbGetLatency 基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。
customMetrics.rocksdbReadBlockCacheHitCount RocksDB のブロック キャッシュで、役に立つものの量、または役に立たずにローカル ディスクの読み取りができないものの量。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB のブロック キャッシュで、役に立つものの量、または役に立たずにローカル ディスクの読み取りができないものの量。
customMetrics.rocksdbSstFileSize すべての SST ファイルのサイズ。 SST は Static Sorted Table (静的並べ替え済みテーブル) の頭文字で、これは RocksDB がデータの格納に使用する表形式の構造です。
customMetrics.rocksdbTotalBytesRead get 操作によって読み取られた非圧縮バイト数。
customMetrics.rocksdbTotalBytesReadByCompaction 圧縮プロセスがディスクから読み取ったバイト数。
customMetrics.rocksdbTotalBytesReadThroughIterator 一部のステートフル操作 (FlatMapGroupsWithState でのタイムアウト処理や、ウォーターマーク処理など) では、反復子を使って DB 内のデータを読み取る必要があります。 このメトリックは、反復子を使って読み取られた非圧縮データのサイズを表します。
customMetrics.rocksdbTotalBytesWritten put 操作によって書き込まれた非圧縮バイト数。
customMetrics.rocksdbTotalBytesWrittenByCompaction 圧縮プロセスがディスクに書き込んだバイト数。
customMetrics.rocksdbTotalCompactionLatencyMs バックグラウンド圧縮や、コミット中に開始されたオプションの圧縮など、RocksDB での圧縮の時間 (ミリ秒単位)。
customMetrics.rocksdbTotalFlushLatencyMs バックグラウンド フラッシュを含むフラッシュ時間。 フラッシュ操作は、一杯になった MemTable がストレージにフラッシュされるプロセスです。 MemTable は、データが RocksDB に格納される最初のレベルです。
customMetrics.rocksdbZipFileBytesUncompressed RocksDB ファイル マネージャーは、物理 SST ファイルのディスク領域の使用と削除を管理します。 このメトリックは、ファイル マネージャーによって報告された、圧縮されていない ZIP ファイルのバイト数を表します。

sources オブジェクト (Kafka)

メトリック 説明
sources.description ストリーミング クエリの読み取り元のソースの名前。 たとえば、「 “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” 」のように入力します。
sources.startOffset オブジェクト ストリーミング ジョブが開始した、Kafka トピック内の開始オフセット番号。
sources.endOffset オブジェクト マイクロバッチによって処理された最新のオフセット。 進行中のマイクロバッチ実行では、これは latestOffset と等しい場合があります。
sources.latestOffset オブジェクト マイクロバッチによって把握された最新のオフセット。 スロットリングがある場合、マイクロバッチ処理プロセスですべてのオフセットが処理されず、そのために endOffsetlatestOffset が異なる可能性があります。
sources.numInputRows このソースから処理された入力行数。
sources.inputRowsPerSecond このソースの処理に関するデータ到着速度。
sources.processedRowsPerSecond このソースに関する Spark のデータ処理速度。

sources.metrics オブジェクト (Kafka)

メトリック 説明
sources.metrics.avgOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの平均数。
sources.metrics.estimatedTotalBytesBehindLatest クエリ プロセスがサブスクライブされたトピックから消費していない推定バイト数。
sources.metrics.maxOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最大数。
sources.metrics.minOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最小数。

sink オブジェクト (Kafka)

メトリック 説明
sink.description ストリーミング クエリが書き込むシンクの名前。 たとえば、「 “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” 」のように入力します。
sink.numOutputRows マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値が "-1" になることがあり、一般に、"不明" と解釈できます。

sources オブジェクト (Delta Lake)

メトリック 説明
sources.description ストリーミング クエリの読み取り元のソースの名前。 たとえば、「 “DeltaSource[table]” 」のように入力します。
sources.[startOffset/endOffset].sourceVersion このオフセットがエンコードされるシリアル化のバージョン。
sources.[startOffset/endOffset].reservoirId 読み取り元のテーブルの ID。 これは、クエリを再開するときに誤った構成を検出するために使われます。
sources.[startOffset/endOffset].reservoirVersion 現在処理中のテーブルのバージョン。
sources.[startOffset/endOffset].index このバージョンの AddFiles のシーケンス内のインデックス。 これは、大きなコミットを複数のバッチに分割するために使われます。 このインデックスは、modificationTimestamppath で並べ替えることによって作成されます。
sources.[startOffset/endOffset].isStartingVersion このオフセットが、変更を処理しているクエリではなく開始しているクエリを表すかどうか。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータが処理された後、到着した新しいデータが処理されます。
sources.latestOffset マイクロバッチ クエリによって処理された最新のオフセット。
sources.numInputRows このソースから処理された入力行数。
sources.inputRowsPerSecond このソースの処理に関するデータ到着速度。
sources.processedRowsPerSecond このソースに関する Spark のデータ処理速度。
sources.metrics.numBytesOutstanding 未処理のファイルの合計サイズ (RocksDB によって追跡されているファイル)。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。
sources.metrics.numFilesOutstanding 処理される予定の未処理ファイルの数。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。

sink オブジェクト (Delta Lake)

メトリック 説明
sink.description ストリーミング クエリが書き込むシンクの名前。 たとえば、「 “DeltaSink[table]” 」のように入力します。
sink.numOutputRows Spark では DSv1 シンク (Delta Lake シンクの分類) の出力行数を推定できないため、このメトリックの行数は "-1" です。

Kafka 同士の間の StreamingQueryListener イベントの例

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Delta Lake 同士の間の StreamingQueryListener イベントの例

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Delta Lake の StreamingQueryListener イベントに対するレート ソースの例

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}