Nóta
Aðgangur að þessari síðu krefst heimildar. Þú getur prófað aðskrá þig inn eða breyta skráasöfnum.
Aðgangur að þessari síðu krefst heimildar. Þú getur prófað að breyta skráasöfnum.
You can use DataFrame operations or SQL table-value functions to query Structured Streaming state data and metadata. Use these functions to observe state information for Structured Streaming stateful queries, which can be useful for monitoring and debugging.
You must have read access to the checkpoint path for a streaming query in order to query state data or metadata. The functions described in this article provide read-only access to state data and metadata. You can only use batch read semantics to query state information.
Note
You cannot query state information for Lakeflow Spark Declarative Pipelines, streaming tables, or materialized views. You cannot query state information using serverless compute or compute configured with standard access mode.
Requirements
- Use one of the following compute configurations:
- Databricks Runtime 16.3 and above on compute configured with standard access mode.
- Databricks Runtime 14.3 LTS and above on compute configured with dedicated or no isolation access mode.
- Read access to the checkpoint path used by the streaming query.
Read Structured Streaming state store
You can read state store information for Structured Streaming queries executed in any supported Databricks Runtime. Use the following syntax:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
Scala
val df = spark.read
.format("statestore")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore('/checkpoint/path')
State reader API options and schema
For a complete list of statestore format options, see State store.
The output data has the following schema:
| Column | Type | Description |
|---|---|---|
key |
Struct (further type derived from the state key) | The key for a stateful operator record in the state checkpoint. |
value |
Struct (further type derived from the state value) | The value for a stateful operator record in the state checkpoint. |
partition_id |
Integer | The partition of the state checkpoint that contains the stateful operator record. |
In Databricks Runtime 16.4 LTS and above, when the readChangeFeed option is set to true, the output data has the following schema:
| Column | Type | Description |
|---|---|---|
batch_id |
Long | The batch ID that the state change belongs to. |
change_type |
String | The type of change applied by the batch: update for inserts and updates, delete for deletions. |
key |
Struct (further type derived from the state key) | The key for a stateful operator record in the state checkpoint. |
value |
Struct (further type derived from the state value) | The value for a stateful operator record in the state checkpoint. null for records where change_type is delete. |
partition_id |
Integer | The partition of the state checkpoint that contains the stateful operator record. |
See read_statestore table-valued function.
Read Structured Streaming state changes
Available on Databricks Runtime 16.4 LTS and above. To read how state changes across microbatches instead of viewing the full state at a single microbatch, set readChangeFeed to true and specify changeStartBatchId. Optionally, specify changeEndBatchId. For a complete list of options, see State store.
For example, to read state changes from batch 2 through the latest committed batch:
Python
df = (spark.read
.format("statestore")
.option("readChangeFeed", True)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
)
Scala
val df = spark.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
SQL
SELECT * FROM read_statestore(
'<checkpointLocation>',
readChangeFeed => true,
changeStartBatchId => 2
);
The output schema includes additional batch_id and change_type columns. For the complete schema, see State reader API options and schema.
Read Structured Streaming state metadata
Available on Databricks Runtime 14.3 LTS or above. You can read state metadata information for Structured Streaming queries:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
The returned data has the following schema:
| Column | Type | Description |
|---|---|---|
operatorId |
Integer | The integer ID of the stateful streaming operator. |
operatorName |
String | Name of the stateful streaming operator. |
stateStoreName |
String | Name of the state store of the operator. |
numPartitions |
Integer | Number of partitions of the state store. |
minBatchId |
Long | The minimum batch ID available for querying state. |
maxBatchId |
Long | The maximum batch ID available for querying state. |
Note
The batch ID values provided by minBatchId and maxBatchId reflect the state at the time the checkpoint was written. Old batches are cleaned up automatically with micro-batch execution, so the value provided here is not guaranteed to still be available.
See read_state_metadata table-valued function.
Example: Query one side of a stream-stream join
Use the following syntax to query the left side of a stream-stream join:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Example: Query state store for stream with multiple stateful operators
This examples uses the state metadata reader to gather metadata details of a streaming query with multiple stateful operators, then uses the metadata results as options for the state reader.
The state metadata reader takes the checkpoint path as the only option, as in the following syntax example:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
Scala
val df = spark.read
.format("state-metadata")
.load("<checkpointLocation>")
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
The following table represents an example output of state store metadata:
| operatorId | operatorName | stateStoreName | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | default | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | default | 200 | 0 | 13 |
To get results for the dedupeWithinWatermark operator, query the state reader with the operatorId option, as in the following example:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
Scala
val leftDf = spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path")
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);