Azure Databricks では、[ストリーミング] タブの Spark UI で、構造化ストリーミング アプリケーションの監視が組み込みで提供されています。
Spark UI で構造化ストリーミング クエリを区別する
.queryName(<query-name>)
コードに writeStream
を追加し、ストリームに一意のクエリ名を指定すると、Spark UI でどのメトリックがどのストリームのものであるかを簡単に区別できます。
構造化ストリーミング メトリックを外部サービスにプッシュする
ストリーミング メトリックは、Apache Spark のストリーミング クエリ リスナー インターフェイスを使用して、アラートまたはダッシュボードのユース ケースのために外部サービスにプッシュできます。 Databricks Runtime 11.3 LTS 以降では、StreamingQueryListener
は Python と Scala で使用できます。
重要
Unity カタログ対応のコンピューティング アクセス モードを使用するワークロードには、次の制限事項が適用されます。
-
StreamingQueryListener
では、Databricks Runtime 15.1 以降で資格情報を使用するか、専用アクセス モードを使用したコンピューティングで Unity カタログによって管理されるオブジェクトと対話する必要があります。 -
StreamingQueryListener
には、標準アクセス モード (以前の共有アクセス モード) で構成された Scala ワークロードに対して Databricks Runtime 16.1 以降が必要です。
メモ
リスナーの処理遅延は、クエリの処理速度に大きな影響を与える可能性があります。 これらのリスナーの処理ロジックを制限し、効率を高めるために Kafka のような高速応答システムへの書き込みを選択することをお勧めします。
リスナーを実装するための構文の基本的な例を次のコードに示します。
スカラ (プログラミング言語)
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python(プログラミング言語)
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
構造化ストリーミングでの監視可能なメトリックを定義する
監視可能なメトリックは、クエリ (DataFrame) で定義できる任意の名前付き集計関数です。 DataFrame の実行が完了ポイントに達すると (つまり、バッチ クエリが完了するか、ストリーミング エポックに達すると)、最後の完了ポイント以降に処理されたデータのメトリックを含む名前付きイベントが生成されます。
Spark セッションにリスナーをアタッチすると、これらのメトリックを監視できます。 リスナーは実行モードによって異なります。
バッチ モード:
QueryExecutionListener
を使用します。QueryExecutionListener
は、クエリの完了時に呼び出されます。QueryExecution.observedMetrics
マップを使用してメトリックにアクセスします。ストリーミング、またはマイクロバッチ:
StreamingQueryListener
を使用します。StreamingQueryListener
は、ストリーミング クエリがエポックを完了したときに呼び出されます。StreamingQueryProgress.observedMetrics
マップを使用してメトリックにアクセスします。 Azure Databricks では、ストリーミング用のcontinuous
トリガー モードはサポートされていません。
次に例を示します。
スカラ (プログラミング言語)
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python(プログラミング言語)
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Unity カタログ、Delta Lake、および Structured Streaming のメトリック テーブル識別子をマップする
構造化ストリーミング メトリックでは、ストリーミング クエリのソースとして使用される Delta テーブルの一意の ID に対して、 reservoirId
フィールドが複数の場所で使用されます。
reservoirId
フィールドは、Delta テーブルによって Delta トランザクション ログに格納されている一意の識別子をマップします。 この ID は、 Unity カタログによって割り当てられ、カタログ エクスプローラーに表示される tableId
値にはマップされません。
Delta テーブルのテーブル識別子を確認するには、次の構文を使用します。 これは、Unity カタログのマネージド テーブル、Unity カタログの外部テーブル、およびすべての Hive メタストア デルタ テーブルで機能します。
DESCRIBE DETAIL <table-name>
結果に表示される id
フィールドは、ストリーミング メトリックの reservoirId
にマップされる識別子です。
StreamingQueryListener オブジェクトのメトリック
田畑 | 説明 |
---|---|
id |
再起動が行われても保持される一意のクエリ ID。 |
runId |
開始/再起動ごとに一意のクエリ ID。 StreamingQuery.runId() に関する説明をご覧ください。 |
name |
ユーザーが指定したクエリの名前。 名前が指定されていない場合、名前は null です。 |
timestamp |
マイクロバッチの実行のタイムスタンプ。 |
batchId |
処理中のデータのバッチにおける一意のID。 失敗後の再試行の場合、特定のバッチ ID が複数回実行されることがあります。 同様に、処理するデータがない場合、バッチ ID はインクリメントされません。 |
batchDuration |
バッチ操作の処理時間 (ミリ秒単位)。 |
numInputRows |
トリガーで処理されたレコード数の集計 (すべてのソースが対象)。 |
inputRowsPerSecond |
データの到着速度の集計 (すべてのソースが対象)。 |
processedRowsPerSecond |
Spark によるデータの処理速度の集計 (すべてのソースが対象)。 |
StreamingQueryListener
また、顧客メトリックとソースの進行状況の詳細を調べることができるオブジェクトを含む次のフィールドも定義します。
田畑 | 説明 |
---|---|
durationMs |
次のコマンドを入力します: ju.Map[String, JLong]
durationMs オブジェクトを参照してください。 |
eventTime |
次のコマンドを入力します: ju.Map[String, String]
eventTime オブジェクトを参照してください。 |
stateOperators |
次のコマンドを入力します: Array[StateOperatorProgress]
stateOperators オブジェクトを参照してください。 |
sources |
次のコマンドを入力します: Array[SourceProgress]
ソース オブジェクトを参照してください。 |
sink |
次のコマンドを入力します: SinkProgress
シンク オブジェクトを参照してください。 |
observedMetrics |
次のコマンドを入力します: ju.Map[String, Row] DataFrame/クエリ ( df.observe など) で定義できる任意の集計関数に名前を付けます。 |
durationMs オブジェクト
オブジェクトの種類: ju.Map[String, JLong]
マイクロバッチ実行プロセスのさまざまなステージが完了するまでにかかる時間に関する情報。
田畑 | 説明 |
---|---|
durationMs.addBatch |
マイクロバッチの実行にかかった時間。 これには、Spark がマイクロバッチの計画に要した時間は含まれません。 |
durationMs.getBatch |
オフセットに関するメタデータをソースから取得するのにかかる時間。 |
durationMs.latestOffset |
マイクロバッチに使われた最新のオフセット。 この進行状況オブジェクトは、ソースから最新のオフセットを取得するのにかかった時間を示します。 |
durationMs.queryPlanning |
実行プランの生成にかかった時間。 |
durationMs.triggerExecution |
マイクロバッチの計画と実行にかかる時間。 |
durationMs.walCommit |
使用可能な新しいオフセットをコミットするのにかかった時間。 |
durationMs.commitBatch |
addBatch 中にシンクに書き込まれたデータのコミットにかかった時間。 コミットをサポートするデータシンクにのみ存在します。 |
durationMs.commitOffsets |
バッチをコミット ログにコミットするのにかかった時間。 |
eventTime オブジェクト
オブジェクトの種類: ju.Map[String, String]
マイクロバッチで処理されているデータ内で示されたイベント時間の値に関する情報。 このデータは、構造化ストリーミング ジョブで定義されているステートフル集計を処理するための状態のトリミング方法を把握するため、ウォーターマークによって使用されます。
田畑 | 説明 |
---|---|
eventTime.avg |
トリガーで示された平均イベント時間。 |
eventTime.max |
トリガーで示された最大イベント時間。 |
eventTime.min |
トリガーで示された最小イベント時間。 |
eventTime.watermark |
トリガーで使われたウォーターマークの値。 |
stateOperators オブジェクト
オブジェクトの種類: Array[StateOperatorProgress]
stateOperators
オブジェクトには、構造化ストリーミング ジョブで定義されているステートフル操作と、そこから生成される集計に関する情報が含まれます。
ストリーム状態演算子の詳細については、「 ステートフル ストリーミングとは」を参照してください。
田畑 | 説明 |
---|---|
stateOperators.operatorName |
メトリックが関連付けられているステートフル 演算子の名前 ( symmetricHashJoin 、 dedupe 、 stateStoreSave など)。 |
stateOperators.numRowsTotal |
ステートフル演算子または集計の結果としての状態にある行の合計数。 |
stateOperators.numRowsUpdated |
ステートフル演算子または集計の結果としての状態で更新された行の合計数。 |
stateOperators.allUpdatesTimeMs |
このメトリックは現在 Spark では測定できず、今後のアップデートで削除される予定です。 |
stateOperators.numRowsRemoved |
ステートフル演算子または集計の結果として状態から削除された行の合計数。 |
stateOperators.allRemovalsTimeMs |
このメトリックは現在 Spark では測定できず、今後のアップデートで削除される予定です。 |
stateOperators.commitTimeMs |
すべての更新 (書き込みと削除) をコミットし、新しいバージョンを返すのにかかった時間。 |
stateOperators.memoryUsedBytes |
状態ストアによって使われたメモリ。 |
stateOperators.numRowsDroppedByWatermark |
ステートフル集計に含めるには遅すぎると見なされた行の数。 ストリーミング集計のみ: 集計後に削除された行の数 (生の入力行ではありません)。 この数は正確ではありませんが、削除されている遅延データがあることを示しています。 |
stateOperators.numShufflePartitions |
このステートフル演算子のシャッフル パーティションの数。 |
stateOperators.numStateStoreInstances |
演算子が初期化および保守した実際の状態ストア インスタンス。 多くのステートフル演算子の場合、これはパーティションの数と同じです。 ただし、ストリーム同士の結合では、パーティションごとに 4 つの状態ストア インスタンスを初期化します。 |
stateOperators.customMetrics |
詳細については、このトピックの stateOperators.customMetrics を参照してください。 |
StateOperatorProgress.customMetrics オブジェクト
オブジェクトの種類: ju.Map[String, JLong]
StateOperatorProgress
には、 customMetrics
フィールドがあります。このフィールドには、これらのメトリックを収集するときに使用している機能に固有のメトリックが含まれています。
特徴 | 説明 |
---|---|
RocksDB ステートストア | RocksDB 状態ストアのメトリック。 |
HDFS 状態ストア | HDFS 状態ストアのメトリック。 |
ストリーム重複除去 | 行重複除去のメトリック。 |
ストリーム集計 | 行集計のメトリック。 |
ストリーム結合演算子 | ストリーム結合演算子のメトリック。 |
transformWithState |
transformWithState 演算子のメトリック。 |
RocksDB 状態ストアのカスタムメトリック
RocksDB から収集された、構造化ストリーミング ジョブに対して保持するステートフル値に関するパフォーマンスと操作についてのメトリックをキャプチャする情報。 詳細については、「Azure Databricks で RocksDB 状態ストアを構成する」を参照してください。
田畑 | 説明 |
---|---|
customMetrics.rocksdbBytesCopied |
RocksDB ファイル マネージャーによって追跡された、コピーされたバイト数。 |
customMetrics.rocksdbCommitCheckpointLatency |
ネイティブ RocksDB のスナップショットを取得し、それをローカル ディレクトリに書き込む時間 (ミリ秒単位)。 |
customMetrics.rocksdbCompactLatency |
チェックポイント コミット中の圧縮時間 (ミリ秒単位、オプション)。 |
customMetrics.rocksdbCommitCompactLatency |
コミット中の圧縮時間 (ミリ秒単位)。 |
customMetrics.rocksdbCommitFileSyncLatencyMs |
ネイティブ RocksDB スナップショットを外部ストレージ (チェックポイントの場所) に同期する時間 (ミリ秒単位)。 |
customMetrics.rocksdbCommitFlushLatency |
RocksDB のメモリ内の変更をローカル ディスクにフラッシュする時間 (ミリ秒単位)。 |
customMetrics.rocksdbCommitPauseLatency |
圧縮などのチェックポイントのコミットの一部としてバックグラウンド ワーカー スレッドを停止する時間(ミリ秒単位)。 |
customMetrics.rocksdbCommitWriteBatchLatency |
メモリ内構造 (WriteBatch ) でのステージされた書き込みをネイティブ RocksDB に適用する時間 (ミリ秒単位)。 |
customMetrics.rocksdbFilesCopied |
RocksDB ファイル マネージャーによって追跡された、コピーされたファイルの数。 |
customMetrics.rocksdbFilesReused |
RocksDB ファイル マネージャーによって追跡された、再利用されたファイルの数。 |
customMetrics.rocksdbGetCount |
get 呼び出しの数 (ステージング書き込みに使用されるメモリ内バッチ - gets からのWriteBatch は含まれません)。 |
customMetrics.rocksdbGetLatency |
基になるネイティブ RocksDB::Get 呼び出しの平均時間 (ナノ秒単位)。 |
customMetrics.rocksdbReadBlockCacheHitCount |
RocksDB のブロック キャッシュからのキャッシュ ヒットの数。 |
customMetrics.rocksdbReadBlockCacheMissCount |
RocksDB のブロック キャッシュ ミスの数。 |
customMetrics.rocksdbSstFileSize |
RocksDB インスタンス内のすべての静的並べ替えられたテーブル (SST) ファイルのサイズ。 |
customMetrics.rocksdbTotalBytesRead |
get 操作によって読み取られた非圧縮バイト数。 |
customMetrics.rocksdbTotalBytesWritten |
put 操作によって書き込まれた非圧縮バイトの合計数。 |
customMetrics.rocksdbTotalBytesReadThroughIterator |
反復子を使用して読み取られた非圧縮データの合計バイト数。 一部のステートフル操作 ( FlatMapGroupsWithState でのタイムアウト処理やウォーターマーク処理など) では、反復子を使用して Azure Databricks にデータを読み取る必要があります。 |
customMetrics.rocksdbTotalBytesReadByCompaction |
圧縮プロセスがディスクから読み取ったバイト数。 |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
圧縮プロセスがディスクに書き込む合計バイト数。 |
customMetrics.rocksdbTotalCompactionLatencyMs |
バックグラウンド圧縮や、コミット中に開始されたオプションの圧縮など、RocksDB での圧縮時間 (ミリ秒単位)。 |
customMetrics.rocksdbTotalFlushLatencyMs |
バックグラウンド フラッシュを含む合計フラッシュ時間。 フラッシュ操作は、 MemTable がいっぱいになるとストレージにフラッシュされるプロセスです。
MemTables は、データが RocksDB に格納される最初のレベルです。 |
customMetrics.rocksdbZipFileBytesUncompressed |
ファイル マネージャーによって報告された、圧縮されていない zip ファイルのサイズ (バイト単位)。 ファイル マネージャーは、物理 SST ファイルのディスク領域の使用と削除を管理します。 |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
チェックポイントの場所に保存された RocksDB スナップショットの最新バージョン。 値 "-1" は、スナップショットが保存されていないことを示します。 スナップショットは各状態ストア インスタンスに固有であるため、このメトリックは特定のパーティション ID と状態ストア名に適用されます。 |
customMetrics.rocksdbPutLatency |
Put 呼び出しの合計待機時間。 |
customMetrics.rocksdbPutCount |
プットオプションの数。 |
customMetrics.rocksdbWriterStallLatencyMs |
圧縮またはフラッシュが完了するまでのライター待機時間。 |
customMetrics.rocksdbTotalBytesWrittenByFlush |
フラッシュによって書き込まれた合計バイト数 |
customMetrics.rocksdbPinnedBlocksMemoryUsage |
ピン留めされたブロックのメモリ使用量 |
customMetrics.rocksdbNumInternalColFamiliesKeys |
内部列ファミリの内部キーの数 |
customMetrics.rocksdbNumExternalColumnFamilies |
外部列ファミリの数 |
customMetrics.rocksdbNumInternalColumnFamilies |
内部列ファミリの数 |
HDFS 状態ストアのカスタム メトリック
HDFS ステート ストア プロバイダーの動作と操作に関して収集される情報。
田畑 | 説明 |
---|---|
customMetrics.stateOnCurrentVersionSizeBytes |
現在のバージョンに限る状態の推定サイズ。 |
customMetrics.loadedMapCacheHitCount |
プロバイダーでキャッシュされた状態に対するキャッシュ ヒットの数。 |
customMetrics.loadedMapCacheMissCount |
プロバイダーにキャッシュされている状態のキャッシュミス数。 |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
特定の状態ストア インスタンスのスナップショットの最後にアップロードされたバージョン。 |
重複排除のカスタムメトリクス
重複除去の動作と操作に関して収集される情報。
田畑 | 説明 |
---|---|
customMetrics.numDroppedDuplicateRows |
重複行が削除された数です。 |
customMetrics.numRowsReadDuringEviction |
状態の削除中に読み取られた状態行の数。 |
集計カスタム メトリック
集計の動作と操作に関して収集される情報。
田畑 | 説明 |
---|---|
customMetrics.numRowsReadDuringEviction |
状態の削除中に読み取られた状態行の数。 |
ストリーム結合カスタム メトリック
ストリーム結合の動作と操作に関して収集される情報。
田畑 | 説明 |
---|---|
customMetrics.skippedNullValueCount |
null が spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled に設定されている場合にスキップされたtrue 値の数。 |
transformWithState カスタム メトリック
transformWithState
(TWS) の動作と操作に関して収集される情報。
transformWithState
の詳細については、「カスタム ステートフル アプリケーションの構築」を参照してください。
田畑 | 説明 |
---|---|
customMetrics.initialStateProcessingTimeMs |
すべての初期状態の処理にかかった時間 (ミリ秒)。 |
customMetrics.numValueStateVars |
値状態変数の数。
transformWithStateInPandas にも存在します。 |
customMetrics.numListStateVars |
リスト状態変数の数。
transformWithStateInPandas にも存在します。 |
customMetrics.numMapStateVars |
マップ状態変数の数。
transformWithStateInPandas にも存在します。 |
customMetrics.numDeletedStateVars |
削除された状態変数の数。
transformWithStateInPandas にも存在します。 |
customMetrics.timerProcessingTimeMs |
すべてのタイマーの処理にかかった時間 (ミリ秒) |
customMetrics.numRegisteredTimers |
登録済みタイマーの数。
transformWithStateInPandas にも存在します。 |
customMetrics.numDeletedTimers |
削除されたタイマーの数。
transformWithStateInPandas にも存在します。 |
customMetrics.numExpiredTimers |
期限切れのタイマーの数。
transformWithStateInPandas にも存在します。 |
customMetrics.numValueStateWithTTLVars |
TTL を持つ値状態変数の数。
transformWithStateInPandas にも存在します。 |
customMetrics.numListStateWithTTLVars |
TTL を持つリスト状態変数の数。
transformWithStateInPandas にも存在します。 |
customMetrics.numMapStateWithTTLVars |
TTL を持つマップ状態変数の数。
transformWithStateInPandas にも存在します。 |
customMetrics.numValuesRemovedDueToTTLExpiry |
TTL の有効期限が切れたために削除された値の数。
transformWithStateInPandas にも存在します。 |
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry |
TTL の有効期限が切れたために増分削除された値の数。 |
ソースオブジェクト
オブジェクトの種類: Array[SourceProgress]
sources
オブジェクトには、ストリーミング データ ソースの情報とメトリックが含まれています。
田畑 | 説明 |
---|---|
description |
ストリーミング データ ソース テーブルの詳細な説明。 |
startOffset |
ストリーミング ジョブが開始されたデータ ソース テーブル内の開始オフセット番号。 |
endOffset |
マイクロバッチによって処理された最後のオフセット。 |
latestOffset |
マイクロバッチによって処理された最新のオフセット。 |
numInputRows |
このソースから処理された入力行数。 |
inputRowsPerSecond |
このソースからの処理のためにデータが到着する速度 (秒単位)。 |
processedRowsPerSecond |
Spark がこのソースからのデータを処理する速度。 |
metrics |
次のコマンドを入力します: ju.Map[String, String] 特定のデータ ソースのカスタム メトリックが含まれています。 |
Azure Databricks には、次のソース オブジェクト実装が用意されています。
メモ
フォーム sources.<startOffset / endOffset / latestOffset>.*
(またはバリエーション) で定義されているフィールドの場合は、指定された子フィールドを含む 3 つの可能なフィールドの 1 つとして解釈します。
sources.startOffset.<child-field>
sources.endOffset.<child-field>
sources.latestOffset.<child-field>
Delta Lake ソース オブジェクト
Delta テーブル ストリーミング データ ソースに使用されるカスタム メトリックの定義。
田畑 | 説明 |
---|---|
sources.description |
ストリーミング クエリの読み取り元となるソースの説明。 (例: “DeltaSource[table]” )。 |
sources.<startOffset / endOffset>.sourceVersion |
このオフセットがエンコードされるシリアル化のバージョン。 |
sources.<startOffset / endOffset>.reservoirId |
読み取られるテーブルの ID。 これは、クエリを再開するときに誤った構成を検出するために使われます。 Unity カタログ、Delta Lake、および Structured Streaming のメトリック テーブル識別子のマップを参照してください。 |
sources.<startOffset / endOffset>.reservoirVersion |
現在処理中のテーブルのバージョン。 |
sources.<startOffset / endOffset>.index |
このバージョンの AddFiles のシーケンス内のインデックス。 これは、大きなコミットを複数のバッチに分割するために使われます。 このインデックスは、modificationTimestamp と path で並べ替えることによって作成されます。 |
sources.<startOffset / endOffset>.isStartingVersion |
現在のオフセットが、初期データの処理後に発生した変更の処理ではなく、新しいストリーミング クエリの開始をマークするかどうかを示します。 新しいクエリを開始すると、開始時にテーブルに存在するすべてのデータがまず処理され、その後、到着する新しいデータが処理されます。 |
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis |
イベント時刻の順序付けに記録されたイベント時間。 処理が保留中の初期スナップショット データのイベント時刻。 イベント時刻の順序で初期スナップショットを処理するときに使用されます。 |
sources.latestOffset |
マイクロバッチ クエリによって処理された最新のオフセット。 |
sources.numInputRows |
このソースから処理された入力行数。 |
sources.inputRowsPerSecond |
このソースからの処理のためにデータが到着する速度。 |
sources.processedRowsPerSecond |
Spark がこのソースからのデータを処理する速度。 |
sources.metrics.numBytesOutstanding |
未処理ファイル (RocksDB によって追跡されているファイル) の合計サイズ。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。 |
sources.metrics.numFilesOutstanding |
処理される予定の未処理ファイルの数。 これは、ストリーミング ソースとしての Delta および自動ローダーのバックログ メトリックです。 |
Apache Kafka sources オブジェクト
Apache Kafka ストリーミング データ ソースに使用されるカスタム メトリックの定義。
田畑 | 説明 |
---|---|
sources.description |
読み取り元の正確な Kafka トピックを指定する、Kafka ソースの詳細な説明。 (例: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]” )。 |
sources.startOffset |
ストリーミング ジョブが開始した、Kafka トピック内の開始オフセット番号。 |
sources.endOffset |
マイクロバッチによって処理された最後のオフセット。 進行中のマイクロバッチ実行では、これは latestOffset と等しい場合があります。 |
sources.latestOffset |
マイクロバッチで計算された最新のオフセット。 マイクロバッチ処理では、調整が発生したときにすべてのオフセットが処理されない可能性があり、その結果、 endOffset と latestOffset が異なります。 |
sources.numInputRows |
このソースから処理された入力行数。 |
sources.inputRowsPerSecond |
このソースからの処理のためにデータが到着する速度。 |
sources.processedRowsPerSecond |
Spark がこのソースからのデータを処理する速度。 |
sources.metrics.avgOffsetsBehindLatest |
サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの平均数。 |
sources.metrics.estimatedTotalBytesBehindLatest |
クエリ プロセスがサブスクライブされたトピックから消費していない推定バイト数。 |
sources.metrics.maxOffsetsBehindLatest |
サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最大数。 |
sources.metrics.minOffsetsBehindLatest |
サブスクライブされたすべてのトピックの中で、ストリーミング クエリが利用可能な最新のオフセットより後であるオフセットの最小数。 |
自動ローダーのソース メトリック
自動ローダー ストリーミング データ ソースに使用されるカスタム メトリックの定義。
田畑 | 説明 |
---|---|
sources.<startOffset / endOffset / latestOffset>.seqNum |
ファイルが検出された順序で処理される一連のファイルの現在位置。 |
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
cloudFiles ソースの実装バージョン。 |
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs |
最新のバックフィル操作の開始時刻。 |
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs |
最新のバックフィル操作の終了時刻。 |
sources.<startOffset / endOffset / latestOffset>.lastInputPath |
ストリームが再起動される前の、ストリームの最後のユーザー指定入力パス。 |
sources.metrics.numFilesOutstanding |
バックログ内のファイルの数 |
sources.metrics.numBytesOutstanding |
バックログ内のファイルのサイズ (バイト) |
sources.metrics.approximateQueueSize |
メッセージ キューのおおよそのサイズ。 cloudFiles.useNotifications オプションが有効になっている場合のみ。 |
PubSub ソースのメトリック
PubSub ストリーミング データ ソースに使用されるカスタム メトリックの定義。 PubSub ストリーミング ソースの監視の詳細については、「ストリーミング メトリックの監視」を参照してください。
田畑 | 説明 |
---|---|
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
このオフセットがエンコードされる実装バージョン。 |
sources.<startOffset / endOffset / latestOffset>.seqNum |
処理中の保持されたシーケンス番号。 |
sources.<startOffset / endOffset / latestOffset>.fetchEpoch |
処理中の最大フェッチ エポック。 |
sources.metrics.numRecordsReadyToProcess |
現在のバックログで処理できるレコードの数。 |
sources.metrics.sizeOfRecordsReadyToProcess |
現在のバックログ内の未処理データの合計サイズ (バイト単位)。 |
sources.metrics.numDuplicatesSinceStreamStart |
開始後にストリームによって処理された重複レコードの合計数。 |
パルサー源の指標
Pulsar ストリーミング データ ソースに使用されるカスタム メトリックの定義。
田畑 | 説明 |
---|---|
sources.metrics.numInputRows |
現在のマイクロバッチで処理された行の数。 |
sources.metrics.numInputBytes |
現在のマイクロバッチで処理されたバイトの合計数。 |
シンクオブジェクト
オブジェクトの種類: SinkProgress
田畑 | 説明 |
---|---|
sink.description |
使用されている特定のシンク実装の詳細を示すシンクの説明。 |
sink.numOutputRows |
出力行の数。 シンクの種類によって、値の動作や制限が異なる場合があります。 サポートされている特定の種類を確認する |
sink.metrics |
ju.Map[String, String] シンクメトリック。 |
現在、Azure Databricks には、次の 2 つの特定の sink
オブジェクト実装が用意されています。
シンクの種類 | 詳細 |
---|---|
Delta テーブル | 「Delta シンク オブジェクト」を参照してください。 |
Apache Kafka のトピック | Kafka シンク オブジェクトを参照してください。 |
sink.metrics
フィールドは、sink
オブジェクトの両方のバリアントで同じように動作します。
Delta Lake 出力先オブジェクト
田畑 | 説明 |
---|---|
sink.description |
Delta シンクの説明。使用されている特定の Delta シンク実装の詳細を示します。 (例: “DeltaSink[table]” )。 |
sink.numOutputRows |
Spark は DSv1 シンクの出力行 (Delta Lake シンクの分類) を推論できないため、行数は常に -1 されます。 |
Apache Kafka シンクオブジェクト
田畑 | 説明 |
---|---|
sink.description |
ストリーミング クエリが書き込む Kafka シンクの説明。使用されている特定の Kafka シンク実装の詳細を示します。 (例: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100” )。 |
sink.numOutputRows |
マイクロバッチの一部として出力テーブルまたはシンクに書き込まれた行の数。 状況によっては、この値が "-1" になることがあり、一般に、"不明" と解釈できます。 |
例
Kafka 同士の間の StreamingQueryListener イベントの例
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Delta Lake 同士の間の StreamingQueryListener イベントの例
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Kinesis から Delta Lake への StreamingQueryListener イベントの例
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Kafka+Delta Lake から Delta Lake への StreamingQueryListener イベントの一例
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Delta Lake StreamingQueryListener イベントに対するレートソースの例
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}