次の方法で共有


Azure Databricks で構造化ストリーミング クエリを監視する

Azure Databricks では、[ストリーミング] タブの Spark UI で、構造化ストリーミング アプリケーションの監視が組み込みで提供されています。

Spark UI で構造化ストリーミング クエリを区別する

.queryName(<query-name>) コードに writeStream を追加し、ストリームに一意のクエリ名を指定すると、Spark UI でどのメトリックがどのストリームのものであるかを簡単に区別できます。

構造化ストリーミング メトリックを外部サービスにプッシュする

ストリーミング メトリックは、Apache Spark のストリーミング クエリ リスナー インターフェイスを使用して、アラートまたはダッシュボードのユース ケースのために外部サービスにプッシュできます。 Databricks Runtime 11.3 LTS 以降では、StreamingQueryListener は Python と Scala で使用できます。

重要

Unity カタログ対応のコンピューティング アクセス モードを使用するワークロードには、次の制限事項が適用されます。

  • StreamingQueryListener では、Databricks Runtime 15.1 以降で資格情報を使用するか、専用アクセス モードを使用したコンピューティングで Unity カタログによって管理されるオブジェクトと対話する必要があります。
  • StreamingQueryListener には、標準アクセス モード (以前の共有アクセス モード) で構成された Scala ワークロードに対して Databricks Runtime 16.1 以降が必要です。

メモ

リスナーの処理遅延は、クエリの処理速度に大きな影響を与える可能性があります。 これらのリスナーの処理ロジックを制限し、効率を高めるために Kafka のような高速応答システムへの書き込みを選択することをお勧めします。

リスナーを実装するための構文の基本的な例を次のコードに示します。

スカラ (プログラミング言語)

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python(プログラミング言語)

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

構造化ストリーミングでの監視可能なメトリックを定義する

監視可能なメトリックは、クエリ (DataFrame) で定義できる任意の名前付き集計関数です。 DataFrame の実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリックを含む名前付きイベントが生成されます。

Spark セッションにリスナーをアタッチすると、これらのメトリックを監視できます。 リスナーは実行モードによって異なります。

  • バッチ モード: QueryExecutionListener を使用します。

    QueryExecutionListener は、クエリの完了時に呼び出されます。 QueryExecution.observedMetrics マップを使用してメトリックにアクセスします。

  • ストリーミング、またはマイクロバッチ: StreamingQueryListenerを使用します。

    StreamingQueryListener は、ストリーミング クエリがエポックを完了したときに呼び出されます。 StreamingQueryProgress.observedMetrics マップを使用してメトリックにアクセスします。 Azure Databricks では、ストリーミング用の continuous トリガー モードはサポートされていません。

次に例を示します。

スカラ (プログラミング言語)

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python(プログラミング言語)

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Unity カタログ、Delta Lake、および Structured Streaming のメトリック テーブル識別子をマップする

構造化ストリーミング メトリックでは、ストリーミング クエリのソースとして使用される Delta テーブルの一意の ID に対して、 reservoirId フィールドが複数の場所で使用されます。

reservoirId フィールドは、Delta テーブルによって Delta トランザクション ログに格納されている一意の識別子をマップします。 この ID は、 Unity カタログによって割り当てられ、カタログ エクスプローラーに表示される tableId 値にはマップされません。

Delta テーブルのテーブル識別子を確認するには、次の構文を使用します。 これは、Unity カタログのマネージド テーブル、Unity カタログの外部テーブル、およびすべての Hive メタストア デルタ テーブルで機能します。

DESCRIBE DETAIL <table-name>

結果に表示される id フィールドは、ストリーミング メトリックの reservoirId にマップされる識別子です。

StreamingQueryListener オブジェクトのメトリック

田畑 説明
id 再起動が行われても保持される一意のクエリ ID。
runId 開始/再起動ごとに一意のクエリ ID。 StreamingQuery.runId() に関する説明をご覧ください。
name ユーザーが指定したクエリの名前。 名前が指定されていない場合、名前は null です。
timestamp マイクロバッチの実行のタイムスタンプ。
batchId 処理中のデータのバッチにおける一意のID。 失敗後の再試行の場合、特定のバッチ ID が複数回実行されることがあります。 同様に、処理するデータがない場合、バッチ ID はインクリメントされません。
batchDuration バッチ操作の処理時間 (ミリ秒単位)。
numInputRows トリガーで処理されたレコード数の集計 (すべてのソースが対象)。
inputRowsPerSecond データの到着速度の集計 (すべてのソースが対象)。
processedRowsPerSecond Spark によるデータの処理速度の集計 (すべてのソースが対象)。

StreamingQueryListener また、顧客メトリックとソースの進行状況の詳細を調べることができるオブジェクトを含む次のフィールドも定義します。

田畑 説明
durationMs 次のコマンドを入力します: ju.Map[String, JLong] durationMs オブジェクトを参照してください。
eventTime 次のコマンドを入力します: ju.Map[String, String] eventTime オブジェクトを参照してください
stateOperators 次のコマンドを入力します: Array[StateOperatorProgress] stateOperators オブジェクトを参照してください
sources 次のコマンドを入力します: Array[SourceProgress] ソース オブジェクトを参照してください
sink 次のコマンドを入力します: SinkProgress シンク オブジェクトを参照してください。
observedMetrics 次のコマンドを入力します: ju.Map[String, Row] DataFrame/クエリ ( df.observe など) で定義できる任意の集計関数に名前を付けます。

durationMs オブジェクト

オブジェクトの種類: ju.Map[String, JLong]

マイクロバッチ実行プロセスのさまざまなステージが完了するまでにかかる時間に関する情報。

田畑 説明
durationMs.addBatch マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画に要した時間は含まれません。
durationMs.getBatch オフセットに関するメタデータをソースから取得するのにかかる時間。
durationMs.latestOffset マイクロバッチに使われた最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を示します。
durationMs.queryPlanning 実行プランの生成にかかった時間。
durationMs.triggerExecution マイクロバッチの計画と実行にかかる時間。
durationMs.walCommit 使用可能な新しいオフセットをコミットするのにかかった時間。
durationMs.commitBatch addBatch中にシンクに書き込まれたデータのコミットにかかった時間。 コミットをサポートするデータシンクにのみ存在します。
durationMs.commitOffsets バッチをコミット ログにコミットするのにかかった時間。

eventTime オブジェクト

オブジェクトの種類: ju.Map[String, String]

マイクロバッチで処理されているデータ内で示されたイベント時間の値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されているステートフル集計を処理するための状態のトリミング方法を把握するため、ウォーターマークによって使用されます。

田畑 説明
eventTime.avg トリガーで示された平均イベント時間。
eventTime.max トリガーで示された最大イベント時間。
eventTime.min トリガーで示された最小イベント時間。
eventTime.watermark トリガーで使われたウォーターマークの値。

stateOperators オブジェクト

オブジェクトの種類: Array[StateOperatorProgress]stateOperators オブジェクトには、構造化ストリーミング ジョブで定義されているステートフル操作と、そこから生成される集計に関する情報が含まれます。

ストリーム状態演算子の詳細については、「 ステートフル ストリーミングとは」を参照してください。

田畑 説明
stateOperators.operatorName メトリックが関連付けられているステートフル 演算子の名前 ( symmetricHashJoindedupestateStoreSaveなど)。
stateOperators.numRowsTotal ステートフル演算子または集計の結果としての状態にある行の合計数。
stateOperators.numRowsUpdated ステートフル演算子または集計の結果としての状態で更新された行の合計数。
stateOperators.allUpdatesTimeMs このメトリックは現在 Spark では測定できず、今後のアップデートで削除される予定です。
stateOperators.numRowsRemoved ステートフル演算子または集計の結果として状態から削除された行の合計数。
stateOperators.allRemovalsTimeMs このメトリックは現在 Spark では測定できず、今後のアップデートで削除される予定です。
stateOperators.commitTimeMs すべての更新 (書き込みと削除) をコミットし、新しいバージョンを返すのにかかった時間。
stateOperators.memoryUsedBytes 状態ストアによって使われたメモリ。
stateOperators.numRowsDroppedByWatermark ステートフル集計に含めるには遅すぎると見なされた行の数。 ストリーミング集計のみ: 集計後に削除された行の数 (生の入力行ではありません)。 この数は正確ではありませんが、削除されている遅延データがあることを示しています。
stateOperators.numShufflePartitions このステートフル演算子のシャッフル パーティションの数。
stateOperators.numStateStoreInstances 演算子が初期化および保守した実際の状態ストア インスタンス。 多くのステートフル演算子の場合、これはパーティションの数と同じです。 ただし、ストリーム同士の結合では、パーティションごとに 4 つの状態ストア インスタンスを初期化します。
stateOperators.customMetrics 詳細については、このトピックの stateOperators.customMetrics を参照してください。

StateOperatorProgress.customMetrics オブジェクト

オブジェクトの種類: ju.Map[String, JLong]

StateOperatorProgress には、 customMetricsフィールドがあります。このフィールドには、これらのメトリックを収集するときに使用している機能に固有のメトリックが含まれています。

特徴 説明
RocksDB ステートストア RocksDB 状態ストアのメトリック
HDFS 状態ストア HDFS 状態ストアのメトリック
ストリーム重複除去 行重複除去のメトリック
ストリーム集計 行集計のメトリック
ストリーム結合演算子 ストリーム結合演算子のメトリック
transformWithState transformWithState演算子のメトリック

RocksDB 状態ストアのカスタムメトリック

RocksDB から収集された、構造化ストリーミング ジョブに対して保持するステートフル値に関するパフォーマンスと操作についてのメトリックをキャプチャする情報。 詳細については、「Azure Databricks で RocksDB 状態ストアを構成する」を参照してください。

田畑 説明
customMetrics.rocksdbBytesCopied RocksDB ファイル マネージャーによって追跡された、コピーされたバイト数。
customMetrics.rocksdbCommitCheckpointLatency ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込む時間 (ミリ秒単位)。
customMetrics.rocksdbCompactLatency チェックポイント コミット中の圧縮時間 (ミリ秒単位、オプション)。
customMetrics.rocksdbCommitCompactLatency コミット中の圧縮時間 (ミリ秒単位)。
customMetrics.rocksdbCommitFileSyncLatencyMs ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。
customMetrics.rocksdbCommitFlushLatency RocksDB のメモリ内の変更をローカル ディスクにフラッシュする時間 (ミリ秒単位)。
customMetrics.rocksdbCommitPauseLatency 圧縮などのチェックポイントのコミットの一部としてバックグラウンド ワーカー スレッドを停止する時間(ミリ秒単位)。
customMetrics.rocksdbCommitWriteBatchLatency メモリ内構造 (WriteBatch) でのステージされた書き込みをネイティブ RocksDB に適用する時間 (ミリ秒単位)。
customMetrics.rocksdbFilesCopied RocksDB ファイル マネージャーによって追跡された、コピーされたファイルの数。
customMetrics.rocksdbFilesReused RocksDB ファイル マネージャーによって追跡された、再利用されたファイルの数。
customMetrics.rocksdbGetCount get呼び出しの数 (ステージング書き込みに使用されるメモリ内バッチ - getsからのWriteBatchは含まれません)。
customMetrics.rocksdbGetLatency 基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。
customMetrics.rocksdbReadBlockCacheHitCount RocksDB のブロック キャッシュからのキャッシュ ヒットの数。
customMetrics.rocksdbReadBlockCacheMissCount RocksDB のブロック キャッシュ ミスの数。
customMetrics.rocksdbSstFileSize RocksDB インスタンス内のすべての静的並べ替えられたテーブル (SST) ファイルのサイズ。
customMetrics.rocksdbTotalBytesRead get 操作によって読み取られた非圧縮バイト数。
customMetrics.rocksdbTotalBytesWritten put 操作によって書き込まれた非圧縮バイトの合計数。
customMetrics.rocksdbTotalBytesReadThroughIterator 反復子を使用して読み取られた非圧縮データの合計バイト数。 一部のステートフル操作 ( FlatMapGroupsWithState でのタイムアウト処理やウォーターマーク処理など) では、反復子を使用して Azure Databricks にデータを読み取る必要があります。
customMetrics.rocksdbTotalBytesReadByCompaction 圧縮プロセスがディスクから読み取ったバイト数。
customMetrics.rocksdbTotalBytesWrittenByCompaction 圧縮プロセスがディスクに書き込む合計バイト数。
customMetrics.rocksdbTotalCompactionLatencyMs バックグラウンド圧縮や、コミット中に開始されたオプションの圧縮など、RocksDB での圧縮時間 (ミリ秒単位)。
customMetrics.rocksdbTotalFlushLatencyMs バックグラウンド フラッシュを含む合計フラッシュ時間。 フラッシュ操作は、 MemTable がいっぱいになるとストレージにフラッシュされるプロセスです。 MemTables は、データが RocksDB に格納される最初のレベルです。
customMetrics.rocksdbZipFileBytesUncompressed ファイル マネージャーによって報告された、圧縮されていない zip ファイルのサイズ (バイト単位)。 ファイル マネージャーは、物理 SST ファイルのディスク領域の使用と削除を管理します。
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> チェックポイントの場所に保存された RocksDB スナップショットの最新バージョン。 値 "-1" は、スナップショットが保存されていないことを示します。 スナップショットは各状態ストア インスタンスに固有であるため、このメトリックは特定のパーティション ID と状態ストア名に適用されます。
customMetrics.rocksdbPutLatency Put 呼び出しの合計待機時間。
customMetrics.rocksdbPutCount プットオプションの数。
customMetrics.rocksdbWriterStallLatencyMs 圧縮またはフラッシュが完了するまでのライター待機時間。
customMetrics.rocksdbTotalBytesWrittenByFlush フラッシュによって書き込まれた合計バイト数
customMetrics.rocksdbPinnedBlocksMemoryUsage ピン留めされたブロックのメモリ使用量
customMetrics.rocksdbNumInternalColFamiliesKeys 内部列ファミリの内部キーの数
customMetrics.rocksdbNumExternalColumnFamilies 外部列ファミリの数
customMetrics.rocksdbNumInternalColumnFamilies 内部列ファミリの数

HDFS 状態ストアのカスタム メトリック

HDFS ステート ストア プロバイダーの動作と操作に関して収集される情報。

田畑 説明
customMetrics.stateOnCurrentVersionSizeBytes 現在のバージョンに限る状態の推定サイズ。
customMetrics.loadedMapCacheHitCount プロバイダーでキャッシュされた状態に対するキャッシュ ヒットの数。
customMetrics.loadedMapCacheMissCount プロバイダーにキャッシュされている状態のキャッシュミス数。
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> 特定の状態ストア インスタンスのスナップショットの最後にアップロードされたバージョン。

重複排除のカスタムメトリクス

重複除去の動作と操作に関して収集される情報。

田畑 説明
customMetrics.numDroppedDuplicateRows 重複行が削除された数です。
customMetrics.numRowsReadDuringEviction 状態の削除中に読み取られた状態行の数。

集計カスタム メトリック

集計の動作と操作に関して収集される情報。

田畑 説明
customMetrics.numRowsReadDuringEviction 状態の削除中に読み取られた状態行の数。

ストリーム結合カスタム メトリック

ストリーム結合の動作と操作に関して収集される情報。

田畑 説明
customMetrics.skippedNullValueCount nullspark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled に設定されている場合にスキップされたtrue値の数。

transformWithState カスタム メトリック

transformWithState (TWS) の動作と操作に関して収集される情報。 transformWithStateの詳細については、「カスタム ステートフル アプリケーションの構築」を参照してください。

田畑 説明
customMetrics.initialStateProcessingTimeMs すべての初期状態の処理にかかった時間 (ミリ秒)。
customMetrics.numValueStateVars 値状態変数の数。 transformWithStateInPandasにも存在します。
customMetrics.numListStateVars リスト状態変数の数。 transformWithStateInPandasにも存在します。
customMetrics.numMapStateVars マップ状態変数の数。 transformWithStateInPandasにも存在します。
customMetrics.numDeletedStateVars 削除された状態変数の数。 transformWithStateInPandasにも存在します。
customMetrics.timerProcessingTimeMs すべてのタイマーの処理にかかった時間 (ミリ秒)
customMetrics.numRegisteredTimers 登録済みタイマーの数。 transformWithStateInPandasにも存在します。
customMetrics.numDeletedTimers 削除されたタイマーの数。 transformWithStateInPandasにも存在します。
customMetrics.numExpiredTimers 期限切れのタイマーの数。 transformWithStateInPandasにも存在します。
customMetrics.numValueStateWithTTLVars TTL を持つ値状態変数の数。 transformWithStateInPandasにも存在します。
customMetrics.numListStateWithTTLVars TTL を持つリスト状態変数の数。 transformWithStateInPandasにも存在します。
customMetrics.numMapStateWithTTLVars TTL を持つマップ状態変数の数。 transformWithStateInPandasにも存在します。
customMetrics.numValuesRemovedDueToTTLExpiry TTL の有効期限が切れたために削除された値の数。 transformWithStateInPandasにも存在します。
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry TTL の有効期限が切れたために増分削除された値の数。

ソースオブジェクト

オブジェクトの種類: Array[SourceProgress]

sources オブジェクトには、ストリーミング データ ソースの情報とメトリックが含まれています。

田畑 説明
description ストリーミング データ ソース テーブルの詳細な説明。
startOffset ストリーミング ジョブが開始されたデータ ソース テーブル内の開始オフセット番号。
endOffset マイクロバッチによって処理された最後のオフセット。
latestOffset マイクロバッチによって処理された最新のオフセット。
numInputRows このソースから処理された入力行数。
inputRowsPerSecond このソースからの処理のためにデータが到着する速度 (秒単位)。
processedRowsPerSecond Spark がこのソースからのデータを処理する速度。
metrics 次のコマンドを入力します: ju.Map[String, String] 特定のデータ ソースのカスタム メトリックが含まれています。

Azure Databricks には、次のソース オブジェクト実装が用意されています。

メモ

フォーム sources.<startOffset / endOffset / latestOffset>.* (またはバリエーション) で定義されているフィールドの場合は、指定された子フィールドを含む 3 つの可能なフィールドの 1 つとして解釈します。

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Delta Lake ソース オブジェクト

Delta テーブル ストリーミング データ ソースに使用されるカスタム メトリックの定義。

田畑 説明
sources.description ストリーミング クエリの読み取り元となるソースの説明。 (例: “DeltaSource[table]”)。
sources.<startOffset / endOffset>.sourceVersion このオフセットがエンコードされるシリアル化のバージョン。
sources.<startOffset / endOffset>.reservoirId 読み取られるテーブルの ID。 これは、クエリを再開するときに誤った構成を検出するために使われます。 Unity カタログ、Delta Lake、および Structured Streaming のメトリック テーブル識別子のマップを参照してください。
sources.<startOffset / endOffset>.reservoirVersion 現在処理中のテーブルのバージョン。
sources.<startOffset / endOffset>.index このバージョンの AddFiles のシーケンス内のインデックス。 これは、大きなコミットを複数のバッチに分割するために使われます。 このインデックスは、modificationTimestamppath で並べ替えることによって作成されます。
sources.<startOffset / endOffset>.isStartingVersion 現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを示します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータがまず処理され、その後、到着する新しいデータが処理されます。
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis イベント時刻の順序付けに記録されたイベント時間。 処理が保留中の初期スナップショット データのイベント時刻。 イベント時刻の順序で初期スナップショットを処理するときに使用されます。
sources.latestOffset マイクロバッチ クエリによって処理された最新のオフセット。
sources.numInputRows このソースから処理された入力行数。
sources.inputRowsPerSecond このソースからの処理のためにデータが到着する速度。
sources.processedRowsPerSecond Spark がこのソースからのデータを処理する速度。
sources.metrics.numBytesOutstanding 未処理ファイル (RocksDB によって追跡されているファイル) の合計サイズ。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。
sources.metrics.numFilesOutstanding 処理される予定の未処理ファイルの数。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。

Apache Kafka sources オブジェクト

Apache Kafka ストリーミング データ ソースに使用されるカスタム メトリックの定義。

田畑 説明
sources.description 読み取り元の正確な Kafka トピックを指定する、Kafka ソースの詳細な説明。 (例: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”)。
sources.startOffset ストリーミング ジョブが開始した、Kafka トピック内の開始オフセット番号。
sources.endOffset マイクロバッチによって処理された最後のオフセット。 進行中のマイクロバッチ実行では、これは latestOffset と等しい場合があります。
sources.latestOffset マイクロバッチで計算された最新のオフセット。 マイクロバッチ処理では、調整が発生したときにすべてのオフセットが処理されない可能性があり、その結果、 endOffsetlatestOffset が異なります。
sources.numInputRows このソースから処理された入力行数。
sources.inputRowsPerSecond このソースからの処理のためにデータが到着する速度。
sources.processedRowsPerSecond Spark がこのソースからのデータを処理する速度。
sources.metrics.avgOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの平均数。
sources.metrics.estimatedTotalBytesBehindLatest クエリ プロセスがサブスクライブされたトピックから消費していない推定バイト数。
sources.metrics.maxOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最大数。
sources.metrics.minOffsetsBehindLatest サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最小数。

自動ローダーのソース メトリック

自動ローダー ストリーミング データ ソースに使用されるカスタム メトリックの定義。

田畑 説明
sources.<startOffset / endOffset / latestOffset>.seqNum ファイルが検出された順序で処理される一連のファイルの現在位置。
sources.<startOffset / endOffset / latestOffset>.sourceVersion cloudFiles ソースの実装バージョン。
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs 最新のバックフィル操作の開始時刻。
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs 最新のバックフィル操作の終了時刻。
sources.<startOffset / endOffset / latestOffset>.lastInputPath ストリームが再起動される前の、ストリームの最後のユーザー指定入力パス。
sources.metrics.numFilesOutstanding バックログ内のファイルの数
sources.metrics.numBytesOutstanding バックログ内のファイルのサイズ (バイト)
sources.metrics.approximateQueueSize メッセージ キューのおおよそのサイズ。 cloudFiles.useNotifications オプションが有効になっている場合のみ。

PubSub ソースのメトリック

PubSub ストリーミング データ ソースに使用されるカスタム メトリックの定義。 PubSub ストリーミング ソースの監視の詳細については、「ストリーミング メトリックの監視」を参照してください。

田畑 説明
sources.<startOffset / endOffset / latestOffset>.sourceVersion このオフセットがエンコードされる実装バージョン。
sources.<startOffset / endOffset / latestOffset>.seqNum 処理中の保持されたシーケンス番号。
sources.<startOffset / endOffset / latestOffset>.fetchEpoch 処理中の最大フェッチ エポック。
sources.metrics.numRecordsReadyToProcess 現在のバックログで処理できるレコードの数。
sources.metrics.sizeOfRecordsReadyToProcess 現在のバックログ内の未処理データの合計サイズ (バイト単位)。
sources.metrics.numDuplicatesSinceStreamStart 開始後にストリームによって処理された重複レコードの合計数。

パルサー源の指標

Pulsar ストリーミング データ ソースに使用されるカスタム メトリックの定義。

田畑 説明
sources.metrics.numInputRows 現在のマイクロバッチで処理された行の数。
sources.metrics.numInputBytes 現在のマイクロバッチで処理されたバイトの合計数。

シンクオブジェクト

オブジェクトの種類: SinkProgress

田畑 説明
sink.description 使用されている特定のシンク実装の詳細を示すシンクの説明。
sink.numOutputRows 出力行の数。 シンクの種類によって、値の動作や制限が異なる場合があります。 サポートされている特定の種類を確認する
sink.metrics ju.Map[String, String] シンクメトリック。

現在、Azure Databricks には、次の 2 つの特定の sink オブジェクト実装が用意されています。

シンクの種類 詳細
Delta テーブル 「Delta シンク オブジェクト」を参照してください。
Apache Kafka のトピック Kafka シンク オブジェクトを参照してください。

sink.metrics フィールドは、sink オブジェクトの両方のバリアントで同じように動作します。

Delta Lake 出力先オブジェクト

田畑 説明
sink.description Delta シンクの説明。使用されている特定の Delta シンク実装の詳細を示します。 (例: “DeltaSink[table]”)。
sink.numOutputRows Spark は DSv1 シンクの出力行 (Delta Lake シンクの分類) を推論できないため、行数は常に -1 されます。

Apache Kafka シンクオブジェクト

田畑 説明
sink.description ストリーミング クエリが書き込む Kafka シンクの説明。使用されている特定の Kafka シンク実装の詳細を示します。 (例: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”)。
sink.numOutputRows マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値が "-1" になることがあり、一般に、"不明" と解釈できます。

Kafka 同士の間の StreamingQueryListener イベントの例

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Delta Lake 同士の間の StreamingQueryListener イベントの例

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Kinesis から Delta Lake への StreamingQueryListener イベントの例

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Kafka+Delta Lake から Delta Lake への StreamingQueryListener イベントの一例

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Delta Lake StreamingQueryListener イベントに対するレートソースの例

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}