構造化ストリーミング状態情報の読み取り

重要

この機能はパブリック プレビュー段階にあります。

Databricks Runtime 14.3 LTS 以降では、DataFrame 操作または SQL テーブル値関数を使って、構造化ストリーミング状態データおよびメタデータのクエリを実行できます。 これらの関数を使うと、構造化ストリーミングのステートフル クエリの状態情報を確認でき、監視やデバッグに役立ちます。

状態データまたはメタデータのクエリを実行するには、ストリーミングのクエリに対するチェックポイント パスの読み取りアクセス権が必要です。 この記事で説明する関数は、状態データとメタデータへの読み取り専用アクセスを提供します。 状態情報のクエリには、バッチ読み取りセマンティクスのみを使用できます。

Note

Delta Live Tables パイプライン、ストリーミング テーブル、または具体化されたビューの状態情報のクエリを実行することはできません。

構造化ストリーミングの状態ストアの読み取り

サポートされている Databricks Runtime で実行される構造化ストリーミングのクエリの状態ストア情報を読み取ることができます。 次の構文を使用します。

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

次のオプション構成がサポートされています。

オプション Type 既定値
batchId Long 最新のバッチ ID
operatorId Long 0
storeName String “DEFAULT”
joinSide String ("left" または "right") 読み取り対象のサイドを表します。 このオプションは、ユーザーがストリーム - ストリーム結合から状態を読み取りたい場合に使われます。

返されるデータには次のスキーマがあります。

タイプ 説明
key Struct (状態キーから派生した型) 状態チェックポイント内のステートフル演算子のレコードのキー。
value Struct (状態値から派生した型) 状態チェックポイント内のステートフル演算子のレコードの値。
partition_id 整数型 ステートフル演算子のレコードを含む状態チェックポイントのパーティション。

構造化ストリーミング状態のメタデータの読み取り

重要

状態メタデータを記録するには、Databricks Runtime 14.2 以降でストリーミングのクエリを実行する必要があります。 状態メタデータ ファイルは、下位互換性を損ないません。 Databricks Runtime 14.1 以前でストリーミングのクエリを実行することを選んだ場合、既存の状態メタデータ ファイルは無視され、新しい状態メタデータ ファイルは書き込まれません。

Databricks Runtime 14.2 以降で実行される構造化ストリーミングのクエリの状態メタデータ情報を読み取ることができます。 次の構文を使用します。

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

返されるデータには次のスキーマがあります。

タイプ Description
operatorId 整数 ステートフル ストリーミング演算子の整数 ID。
operatorName 整数型 ステートフル ストリーミング演算子の名前。
stateStoreName String 演算子の状態ストアの名前。
numPartitions 整数型 状態ストアのパーティションの数。
minBatchId Long 状態のクエリに使用できる最小バッチ ID。
maxBatchId Long 状態のクエリに使用できる最大バッチ ID。

Note

minBatchIdmaxBatchId によって指定されるバッチ ID 値は、チェックポイントが書き込まれた時点の状態を反映しています。 古いバッチはマイクロ バッチの実行によって自動的にクリーンアップされるため、ここで指定された値が引き続き使用できるとは限りません。