讀取結構化串流狀態資訊

您可以使用 DataFrame 作業或 SQL 資料表值函式來查詢結構化串流狀態數據和元數據。 使用這些函式來觀察結構化串流具狀態查詢的狀態資訊,這對於監視和偵錯很有用。

您必須具有串流查詢檢查點路徑的讀取權限,才能查詢狀態資料或中繼資料。 本文所述的函式提供狀態資料和中繼資料的唯讀存取。 您僅能使用批次讀取語意來查詢狀態資訊。

注意

您無法查詢 Lakeflow Spark 宣告式管線、串流資料表或具體化檢視的狀態資訊。 您無法使用無伺服器計算或以標準存取模式設定的計算來查詢狀態資訊。

需求

  • 使用下列其中一個計算配置:
    • 以標準存取模式設定的計算上,Databricks Runtime 16.3 和更新版本。
    • Databricks Runtime 14.3 LTS 和更新版本在以專用或無隔離存取模式設定的計算上。
  • 串流查詢所使用的檢查點路徑讀取許可權。

讀取結構化串流狀態存放區

您可讀取任何支援的 Databricks Runtime 中所執行結構化串流查詢的狀態存放區資訊。 使用下列語法:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

Scala

val df = spark.read
  .format("statestore")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore('/checkpoint/path')

狀態讀取器 API 選項與結構

完整格式選項列表 statestore ,請參見 State store

輸出資料的結構如下:

類型 說明
key 結構 (衍生自狀態索引鍵的進一步類型) 狀態檢查點中具狀態運算子記錄的索引鍵。
value 結構 (衍生自狀態值的進一步類型) 狀態檢查點中狀態運算子記錄的值。
partition_id 整數 包含狀態操作記錄的狀態檢查點分區。

在 Databricks Runtime 16.4 LTS 及以上版本中,當 readChangeFeed 選項設為 true時,輸出資料的結構如下:

類型 說明
batch_id 狀態變更所屬的批次 ID。
change_type 字串 批次套用的變更類型: update 插入與更新、 delete 刪除。
key 結構 (衍生自狀態索引鍵的進一步類型) 狀態檢查點中具狀態運算子記錄的索引鍵。
value 結構 (衍生自狀態值的進一步類型) 狀態檢查點中狀態運算子記錄的值。 null 用於 change_typedelete 的記錄。
partition_id 整數 包含狀態操作記錄的狀態檢查點分區。

請參閱 read_statestore 資料表值函式

閱讀結構化串流狀態變更

可於 Databricks Runtime 16.4 LTS 及更新版本中使用。 若要讀取微批次間的狀態變化,而非在單一微批次查看完整狀態,請設定 readChangeFeedtrue 並指定 changeStartBatchId。 可選擇性地指定 changeEndBatchId。 完整選項清單請參閱 州商店

例如,若要讀取從批次 2 到最新已提交的批次之間的狀態變更:

Python

df = (spark.read
  .format("statestore")
  .option("readChangeFeed", True)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")
)

Scala

val df = spark.read
  .format("statestore")
  .option("readChangeFeed", true)
  .option("changeStartBatchId", 2)
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_statestore(
    '<checkpointLocation>',
    readChangeFeed => true,
    changeStartBatchId => 2
);

輸出結構包含額外 batch_idchange_type 欄位。 完整架構請參閱 狀態讀取器 API 選項與架構

讀取結構化串流狀態中繼資料

可在 Databricks 執行環境 14.3 LTS 或更高版本上取得。 你可以讀取結構化串流查詢的狀態元資料資訊:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

傳回的數據具有下列架構:

類型 說明
operatorId 整數 具狀態串流運算子的整數識別碼。
operatorName 字串 具狀態串流運算子的名稱。
stateStoreName 字串 運算子狀態存放區的名稱。
numPartitions 整數 狀態存放區的分割區數目。
minBatchId 可供查詢狀態的最小批次識別碼。
maxBatchId 可供查詢狀態的最大批次識別碼。

注意

minBatchIdmaxBatchId 提供的批次標識碼值會反映檢查點寫入時的狀態。 舊批次會隨著微批次執行自動清理,因此此處提供的值不保證仍可供使用。

請參閱 read_state_metadata 資料表值函式

範例:查詢數據流聯結的一端

使用下列語法來查詢資料流聯結的左側:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

範例:查詢具有多個狀態運算元的資料流的狀態儲存區

此範例會使用狀態元數據讀取器來收集具有多個具狀態運算符之串流查詢的元數據詳細數據,然後使用元數據結果作為狀態讀取器的選項。

狀態元數據讀取器會採用檢查點路徑作為唯一選項,如下列語法範例所示:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

Scala

val df = spark.read
  .format("state-metadata")
  .load("<checkpointLocation>")

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

下表代表狀態存放區元資料的範例輸出:

operatorId 操作員名稱 stateStoreName 分區數量 minBatchId maxBatchId
0 狀態存儲保存 預設 200 0 13
1 dedupeWithinWatermark 預設 200 0 13

若要取得 運算符的結果 dedupeWithinWatermark ,請使用 operatorId 選項查詢狀態讀取器,如下列範例所示:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

Scala

val leftDf = spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path")

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);