Partilhar via


Ler informações de estado do Streaming Estruturado

Você pode usar operações DataFrame ou funções de valor de tabela SQL para consultar dados e metadados de estado do Streaming Estruturado. Utilize estas funções para monitorizar o estado das consultas com estado do Structured Streaming, que podem ser úteis para monitorização e resolução de problemas.

Você deve ter acesso de leitura ao caminho do ponto de verificação para uma consulta em fluxo contínuo a fim de consultar dados de estado ou metadados. As funções descritas neste artigo fornecem acesso somente leitura a dados de estado e metadados. Você só pode usar a semântica de leitura em lote para consultar informações de estado.

Nota

Não é possível consultar informações de estado para Lakeflow Spark Declarative Pipelines, tabelas de streaming ou exibições materializadas. Não é possível consultar informações de estado usando computação sem servidor ou computação configurada com o modo de acesso padrão.

Requerimentos

  • Use uma das seguintes configurações de computação:
    • Databricks Runtime 16.3 e superior na computação configurada com o modo de acesso padrão.
    • Databricks Runtime 14.3 LTS e superior em computação configurada com modo de acesso dedicado ou sem isolamento.
  • Acesso de leitura ao caminho do ponto de verificação usado pela consulta de streaming.

Ler armazenamento de estado de streaming estruturado

Você pode ler informações de armazenamento de estado para consultas de Streaming Estruturado executadas em qualquer Databricks Runtime suportado. Utilize a seguinte sintaxe:

Python

df = (spark.read
  .format("statestore")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore('/checkpoint/path')

Parâmetros da API do leitor de estado

A API do leitor de estado suporta as seguintes configurações opcionais:

Opção Tipo Valor padrão Descrição
batchId Longo ID do lote mais recente Representa o lote de destino a partir do qual ler. Especifique esta opção para consultar informações de estado para um estado anterior da consulta. O lote deve ser confirmado, mas ainda não finalizado.
operatorId Longo 0 Representa o operador de destino a partir do qual ler. Esta opção é usada quando a consulta utiliza múltiplos operadores com estado.
storeName Cadeia "PADRÃO" Representa o nome do armazenamento de estado de destino a partir do qual ler. Essa opção é usada quando o operador stateful usa várias instâncias de armazenamento de estado. Tanto storeName como joinSide devem ser especificados para uma junção fluxo-vapor, mas não ambos.
joinSide Cadeia de caracteres ("esquerda" ou "direita") Representa o lado de destino a partir do qual ler. Essa opção é usada quando os usuários querem ler o estado de uma junção de fluxos.
stateVarName Cadeia Nenhum O nome da variável de estado a ser lido como parte desta consulta. O nome da variável de estado é o nome exclusivo dado a cada variável dentro da init função de um StatefulProcessor usado pelo transformWithState operador. Esta opção é necessária se o transformWithState operador for usado. Esta opção aplica-se apenas ao operador transformWithState e é ignorada para outros operadores. Disponível no Databricks Runtime 16.2 e superior.
readRegisteredTimers booleano falso Defina como true para ler temporizadores registrados usados dentro do operador transformWithState. Esta opção aplica-se apenas ao operador transformWithState e é ignorada para outros operadores. Disponível no Databricks Runtime 16.2 e superior.
flattenCollectionTypes booleano verdadeiro Se true, nivela os registros retornados para variáveis de estado de mapa e lista. Se false, os registros são retornados usando um Spark SQL Array ou Map. Esta opção aplica-se apenas ao operador transformWithState e é ignorada para outros operadores. Disponível no Databricks Runtime 16.2 e superior.

Os dados retornados têm o seguinte esquema:

Coluna Tipo Descrição
key Struct (tipo adicional derivado da chave de estado) A chave para um registro de operador com estado no ponto de verificação de estado.
value Struct (tipo adicional derivado do valor de estado) O valor de um registo de operador com estado no ponto de verificação de estado.
partition_id Número inteiro A partição do ponto de verificação do estado que contém o registo do operador com estado.

Consulte read_statestore função de valor de tabela.

Ler metadados de estado do Streaming Estruturado

Importante

Você deve executar consultas de streaming no Databricks Runtime 14.2 ou superior para registrar metadados de estado. Os arquivos de metadados de estado não quebram a compatibilidade com versões anteriores. Se você optar por executar uma consulta de streaming no Databricks Runtime 14.1 ou inferior, os arquivos de metadados de estado existentes serão ignorados e nenhum novo arquivo de metadados de estado será gravado.

Você pode ler informações de metadados de estado para consultas de Streaming Estruturado executadas no Databricks Runtime 14.2 ou superior. Utilize a seguinte sintaxe:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

Os dados retornados têm o seguinte esquema:

Coluna Tipo Descrição
operatorId Número inteiro O ID inteiro do operador de streaming com estado.
operatorName Número inteiro Nome do operador de streaming com estado.
stateStoreName Cadeia Nome do armazenamento de estado do operador.
numPartitions Número inteiro Número de partições do armazenamento de estado.
minBatchId Longo O identificador de lote mínimo disponível para consulta de estado.
maxBatchId Longo O ID máximo de lote disponível para a consulta de estado.

Nota

Os valores de ID de lote fornecidos por minBatchId e maxBatchId refletem o estado no momento em que o ponto de verificação foi gravado. Os lotes antigos são limpos automaticamente com a execução de microlotes, portanto, não é garantido que o valor fornecido aqui ainda esteja disponível.

Consulte read_state_metadata função de valor de tabela.

Exemplo: Analisar um lado de uma junção de fluxos de dados

Use a sintaxe a seguir para consultar o lado esquerdo de uma associação de fluxo de fluxo:

Python

left_df = (spark.read
  .format("statestore")
  .option("joinSide", "left")
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    joinSide => 'left'
);

Exemplo: Consultar armazenamento de estado para fluxo com múltiplos operadores com estado

Este exemplo usa o leitor de metadados de estado para coletar detalhes de metadados de uma consulta de streaming com vários operadores com estado e, em seguida, usa os resultados de metadados como opções para o leitor de estado.

O leitor de metadados de estado usa o caminho do ponto de verificação como a única opção, como no exemplo de sintaxe a seguir:

Python

df = (spark.read
  .format("state-metadata")
  .load("<checkpointLocation>"))

SQL

SELECT * FROM read_state_metadata('/checkpoint/path')

A tabela a seguir representa um exemplo de saída de metadados de armazenamento de estado:

operatorId nome_do-operador stateStoreName [en] numPartições minBatchId maxBatchId
0 stateStoreSalvar predefinição 200 0 13
1 dedupeWithinWatermark predefinição 200 0 13

Para obter resultados para o dedupeWithinWatermark operador, consulte o leitor de estado com a operatorId opção, como no exemplo a seguir:

Python

left_df = (spark.read
  .format("statestore")
  .option("operatorId", 1)
  .load("/checkpoint/path"))

SQL

SELECT * FROM read_statestore(
    '/checkpoint/path',
    operatorId => 1
);