您可以使用 DataFrame 作業或 SQL 資料表值函式來查詢結構化串流狀態數據和元數據。 使用這些函式來觀察結構化串流具狀態查詢的狀態資訊,這對於監視和偵錯很有用。
您必須具有串流查詢檢查點路徑的讀取權限,才能查詢狀態資料或中繼資料。 本文所述的函式提供狀態資料和中繼資料的唯讀存取。 您僅能使用批次讀取語意來查詢狀態資訊。
注意
您無法查詢 Lakeflow Spark 宣告式管線、串流資料表或具體化檢視的狀態資訊。 您無法使用無伺服器計算或以標準存取模式設定的計算來查詢狀態資訊。
需求
- 使用下列其中一個計算配置:
- 以標準存取模式設定的計算上,Databricks Runtime 16.3 和更新版本。
- Databricks Runtime 14.3 LTS 和更新版本在以專用或無隔離存取模式設定的計算上。
- 串流查詢所使用的檢查點路徑讀取許可權。
讀取結構化串流狀態存放區
您可讀取任何支援的 Databricks Runtime 中所執行結構化串流查詢的狀態存放區資訊。 使用下列語法:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
狀態讀取器 API 選項與結構
完整格式選項列表 statestore ,請參見 State store。
輸出資料的結構如下:
| 欄 | 類型 | 說明 |
|---|---|---|
key |
結構 (衍生自狀態索引鍵的進一步類型) | 狀態檢查點中具狀態運算子記錄的索引鍵。 |
value |
結構 (衍生自狀態值的進一步類型) | 狀態檢查點中狀態運算子記錄的值。 |
partition_id |
整數 | 包含狀態操作記錄的狀態檢查點分區。 |
在 Databricks Runtime 16.4 LTS 及以上版本中,當 readChangeFeed 選項設為 true時,輸出資料的結構如下:
| 欄 | 類型 | 說明 |
|---|---|---|
batch_id |
長 | 狀態變更所屬的批次 ID。 |
change_type |
字串 | 批次套用的變更類型: update 插入與更新、 delete 刪除。 |
key |
結構 (衍生自狀態索引鍵的進一步類型) | 狀態檢查點中具狀態運算子記錄的索引鍵。 |
value |
結構 (衍生自狀態值的進一步類型) | 狀態檢查點中狀態運算子記錄的值。
null 用於 change_type 為 delete 的記錄。 |
partition_id |
整數 | 包含狀態操作記錄的狀態檢查點分區。 |
閱讀結構化串流狀態變更
可於 Databricks Runtime 16.4 LTS 及更新版本中使用。 若要讀取微批次間的狀態變化,而非在單一微批次查看完整狀態,請設定 readChangeFeed 為 true 並指定 changeStartBatchId。 可選擇性地指定 changeEndBatchId。 完整選項清單請參閱 州商店。
例如,若要讀取從批次 2 到最新已提交的批次之間的狀態變更:
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
輸出結構包含額外 batch_id 與 change_type 欄位。 完整架構請參閱 狀態讀取器 API 選項與架構。
讀取結構化串流狀態中繼資料
可在 Databricks 執行環境 14.3 LTS 或更高版本上取得。 你可以讀取結構化串流查詢的狀態元資料資訊:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
傳回的數據具有下列架構:
| 欄 | 類型 | 說明 |
|---|---|---|
operatorId |
整數 | 具狀態串流運算子的整數識別碼。 |
operatorName |
字串 | 具狀態串流運算子的名稱。 |
stateStoreName |
字串 | 運算子狀態存放區的名稱。 |
numPartitions |
整數 | 狀態存放區的分割區數目。 |
minBatchId |
長 | 可供查詢狀態的最小批次識別碼。 |
maxBatchId |
長 | 可供查詢狀態的最大批次識別碼。 |
注意
minBatchId 和 maxBatchId 提供的批次標識碼值會反映檢查點寫入時的狀態。 舊批次會隨著微批次執行自動清理,因此此處提供的值不保證仍可供使用。
請參閱 read_state_metadata 資料表值函式。
範例:查詢數據流聯結的一端
使用下列語法來查詢資料流聯結的左側:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
範例:查詢具有多個狀態運算元的資料流的狀態儲存區
此範例會使用狀態元數據讀取器來收集具有多個具狀態運算符之串流查詢的元數據詳細數據,然後使用元數據結果作為狀態讀取器的選項。
狀態元數據讀取器會採用檢查點路徑作為唯一選項,如下列語法範例所示:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
下表代表狀態存放區元資料的範例輸出:
| operatorId | 操作員名稱 | stateStoreName | 分區數量 | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | 狀態存儲保存 | 預設 | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | 預設 | 200 | 0 | 13 |
若要取得 運算符的結果 dedupeWithinWatermark ,請使用 operatorId 選項查詢狀態讀取器,如下列範例所示:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);