Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Você pode usar operações DataFrame ou funções de valor de tabela SQL para consultar metadados e dados de estado de Streaming Estruturado. Use essas funções para observar informações de estado para consultas com estado de Streaming Estruturado, que podem ser úteis para monitoramento e depuração.
É preciso ter acesso de leitura no caminho do ponto de verificação de uma consulta de streaming para consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente leitura a dados de estado e metadados. Você só pode usar a semântica de leitura em lote para consultar informações de estado.
Observação
Não é possível consultar informações de estado para Pipelines Declarativas do Lakeflow Spark, tabelas de streaming ou exibições materializadas. Não é possível consultar informações de estado usando computação sem servidor ou computação configurada com o modo de acesso padrão.
Requisitos
- Use uma das seguintes configurações de computação:
- Databricks Runtime 16.3 e posteriores na computação configurada com o modo de acesso padrão.
- Databricks Runtime 14.3 LTS e posteriores na computação configurada com modo de acesso dedicado ou sem isolamento.
- Leia o acesso ao caminho do ponto de verificação usado pela consulta de streaming.
Ler o repositório de estado do Streaming Estruturado
Você pode ler as informações de armazenamento de estado para consultas de Streaming Estruturado executadas em qualquer Databricks Runtime com suporte. Use a seguinte sintaxe:
Python
df = (spark.read
.format("statestore")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore('/checkpoint/path')
Parâmetros da API do leitor de estado
A API de leitor de estado dá suporte às seguintes configurações opcionais:
| Opção | Tipo | Valor padrão | Descrição |
|---|---|---|---|
batchId |
Longo | ID do lote mais recente | Representa o lote de destino para leitura. Especifique essa opção para consultar informações de estado para um estado anterior da consulta. O lote precisa ser confirmado, mas ainda não foi limpo. |
operatorId |
Longo | 0 | Representa o operador de destino para leitura. Essa opção é usada quando a consulta está usando vários operadores com estado. |
storeName |
Cadeia de caracteres | "PADRÃO" | Representa o nome do repositório de estado de destino do qual a leitura será feita. Essa opção é usada quando o operador com estado usa várias instâncias do repositório de estado.
storeName ou joinSide precisa ser especificado para uma junção entre fluxo e fluxo, mas não para ambos. |
joinSide |
Cadeia de caracteres (“esquerda” ou “direita”) | Representa o lado de destino a ser lido. Essa opção é usada quando os usuários desejam ler o estado de uma junção de fluxos. | |
stateVarName |
Cadeia de caracteres | Nenhum | O nome da variável de estado a ser lido como parte dessa consulta. O nome da variável de estado é o nome exclusivo dado a cada variável dentro de uma função init de um StatefulProcessor usado pelo operador transformWithState. Essa opção será necessária se o transformWithState operador for usado. Essa opção só se aplica ao transformWithState operador e é ignorada para outros operadores. Disponível no Databricks Runtime 16.2 e superior. |
readRegisteredTimers |
booleano | falso | Defina para true para ler os temporizadores registrados usados pelo operador transformWithState. Essa opção só se aplica ao transformWithState operador e é ignorada para outros operadores. Disponível no Databricks Runtime 16.2 e superior. |
flattenCollectionTypes |
booleano | verdadeiro | Se true, achata os registros retornados para as variáveis de estado de mapa e lista. Se false, os registros são retornados usando Spark SQL Array ou Map. Essa opção só se aplica ao transformWithState operador e é ignorada para outros operadores. Disponível no Databricks Runtime 16.2 e superior. |
Os dados retornados têm o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
key |
Struct (tipo adicional derivado da chave de estado) | A chave de um registro do operador com estado no ponto de verificação de estado. |
value |
Struct (tipo adicional derivado do valor de estado) | O valor de um registro do operador com estado no ponto de verificação de estado. |
partition_id |
Inteiro | A partição do ponto de verificação de estado que contém o registro do operador com estado. |
Consulte TVF read_statestore.
Ler metadados de estado de Streaming Estruturado
Importante
Você precisa executar consultas de streaming no Databricks Runtime 14.2 ou superior para registrar metadados de estado. Os arquivos de metadados de estado não comprometem a compatibilidade com versões anteriores. Se você optar por executar uma consulta de streaming no Databricks Runtime 14.1 ou anterior, arquivos de metadados existentes serão ignorados e nenhum novo arquivo de metadados de estado será gravado.
Você pode ler as informações de metadados de estado para consultas de Streaming Estruturado executadas no Databricks Runtime 14.2 e posteriores. Use a seguinte sintaxe:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
Os dados retornados têm o seguinte esquema:
| Coluna | Tipo | Descrição |
|---|---|---|
operatorId |
Inteiro | A ID de inteiro do operador de streaming com estado. |
operatorName |
Inteiro | Nome do operador de streaming com estado. |
stateStoreName |
Cadeia de caracteres | Nome do repositório de estado do operador. |
numPartitions |
Inteiro | Número de partições do repositório de estado. |
minBatchId |
Longo | A ID mínima do lote disponível para consultar o estado. |
maxBatchId |
Longo | A ID máxima do lote disponível para consultar o estado. |
Observação
Os valores de ID do lote fornecidos por minBatchId e maxBatchId refletem o estado no momento em que o ponto de verificação foi gravado. Lotes antigos são limpos automaticamente com execução de microlotes, então não há garantia que o valor fornecido aqui ainda esteja disponível.
Consulte TVF read_state_metadata.
Exemplo: consultar um lado de uma junção entre fluxos
Use a seguinte sintaxe para consultar o lado esquerdo de uma junção de fluxo de fluxo:
Python
left_df = (spark.read
.format("statestore")
.option("joinSide", "left")
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
joinSide => 'left'
);
Exemplo: consultar o repositório de estado para fluxo com vários operadores com estado
Esses exemplos usam o leitor de metadados de estado para coletar detalhes de metadados de uma consulta de streaming com vários operadores com estado e, em seguida, usa os resultados dos metadados como opções para o leitor de estado.
O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:
Python
df = (spark.read
.format("state-metadata")
.load("<checkpointLocation>"))
SQL
SELECT * FROM read_state_metadata('/checkpoint/path')
A tabela a seguir representa um exemplo de saída dos metadados do repositório de estado:
| operatorId | operatorName | stateStoreName | numPartitions | minBatchId | maxBatchId |
|---|---|---|---|---|---|
| 0 | stateStoreSave | padrão | 200 | 0 | 13 |
| 1 | dedupeWithinWatermark | padrão | 200 | 0 | 13 |
Para obter resultados para o dedupeWithinWatermark operador, consulte o leitor de estado com a opção operatorId , como no exemplo a seguir:
Python
left_df = (spark.read
.format("statestore")
.option("operatorId", 1)
.load("/checkpoint/path"))
SQL
SELECT * FROM read_statestore(
'/checkpoint/path',
operatorId => 1
);