Read Structured Streaming state information
Important
This feature is in Public Preview.
In Databricks Runtime 14.3 LTS and above, you can use DataFrame operations or SQL table-value functions to query Structured Streaming state data and metadata. You can use these functions to observe state information for Structured Streaming stateful queries, which can be useful for monitoring and debugging.
You must have read access to the checkpoint path for a streaming query in order to query state data or metadata. The functions described in this article provide read-only access to state data and metadata. You can only use batch read semantics to query state information.
Note
You cannot query state information for Delta Live Tables pipelines, streaming tables, or materialized views.
Read Structured Streaming state store
You can read state store information for Structured Streaming queries executed in any supported Databricks Runtime. Use the following syntax:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
The following optional configurations are supported:
Option | Type | Default value | Description |
---|---|---|---|
batchId |
Long | latest batch ID | Represents the target batch to read from. Specify this option to query state information for an earlier state of the query. The batch must be committed but not yet cleaned up. |
operatorId |
Long | 0 | Represents the target operator to read from. This option is used when the query is using multiple stateful operators. |
storeName |
String | “DEFAULT” | Represents the target state store name to read from. This option is used when the stateful operator uses multiple state store instances. Either storeName or joinSide must be specified for a stream-steam join, but not both. |
joinSide |
String (“left” or “right”) | Represents the target side to read from. This option is used when users want to read the state from stream-stream join. |
The returned data has the following schema:
Column | Type | Description |
---|---|---|
key |
Struct (further type derived from the state key) | The key for a stateful operator record in the state checkpoint. |
value |
Struct (further type derived from the state value) | The value for a stateful operator record in the state checkpoint. |
partition_id |
Integer | The partition of the state checkpoint that contains the stateful operator record. |
Read Structured Streaming state metadata
Important
You must run streaming queries on Databricks Runtime 14.2 or above to record state metadata. State metadata files do not break backward compatibility. If you choose to run a streaming query on Databricks Runtime 14.1 or below, existing state metadata files are ignored and no new state metadata files are written.
You can read state metadata information for Structured Streaming queries run on Databricks Runtime 14.2 or above. Use the following syntax:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
The returned data has the following schema:
Column | Type | Description |
---|---|---|
operatorId |
Integer | The integer ID of the stateful streaming operator. |
operatorName |
Integer | Name of the stateful streaming operator. |
stateStoreName |
String | Name of the state store of the operator. |
numPartitions |
Integer | Number of partitions of the state store. |
minBatchId |
Long | The minimum batch ID available for querying state. |
maxBatchId |
Long | The maximum batch ID available for querying state. |
Note
The batch ID values provided by minBatchId
and maxBatchId
reflect the state at the time the checkpoint was written. Old batches are cleaned up automatically with micro-batch execution, so the value provided here is not guaranteed to still be available.