Partilhar via


O que é streaming stateful?

Esta página explica consultas de Streaming Estruturado com estado, incluindo operações com estado, recomendações de otimização, encadeamento de múltiplos operadores com estado e reequilíbrio de estados.

Uma consulta stateful de Streaming Estruturado requer atualizações incrementais para a informação de estado intermediária, enquanto uma consulta stateless de Streaming Estruturado apenas controla informações sobre quais linhas foram processadas da origem para o coletor. Para funcionalidades de otimização disponíveis para consultas sem estado, veja Otimizar consultas de streaming sem estado.

Operações com estado

As operações stateful incluem agregação de streaming, distinct, dropDuplicates, joins stream-stream e aplicações stateful personalizadas.

As informações de estado intermediário necessárias para consultas com estado em streaming estruturado podem causar problemas inesperados de latência e produção se configuradas incorretamente.

No Databricks Runtime 13.3 LTS ou posterior, pode ativar o ponto de verificação de alterações com o RocksDB para reduzir a duração dos pontos de verificação e a latência total para cargas de trabalho de Structured Streaming. O Databricks recomenda ativar o ponto de verificação do registo de alterações para todas as consultas com monitorização de estado de Transmissão em Fluxo Estruturada. Consulte Ativar ponto de verificação do registo de alterações.

Otimizar consultas de Streaming Estruturado com estado

A Databricks recomenda o seguinte para consultas de Streaming Estruturado com estado:

  • Utilize instâncias otimizadas para computação como processos de trabalho.
  • Defina o número de partições aleatórias para 1-2 vezes o número de núcleos no cluster.
  • Defina a configuração spark.sql.streaming.noDataMicroBatches.enabled como false no SparkSession. Isto impede que o motor de micro-batch de streaming processe micro-lotes que não contenham dados. Definir esta configuração para false pode também resultar em operações com estado que utilizam marcas de água ou tempos limite de processamento não emitirem dados até chegarem novos dados, em vez de emitirem imediatamente.

O Databricks recomenda o uso do RocksDB com o ponto de verificação do changelog para gerenciar o estado de fluxos com monitoração de estado. Consulte Configurar o armazenamento de estado do RocksDB no Azure Databricks.

Nota

O esquema de gestão de estado não pode ser alterado entre os reinícios de consultas. Se uma consulta tiver sido iniciada com o gerenciamento padrão, você deverá reiniciá-la do zero com um novo local de ponto de verificação para alterar o armazenamento de estado.

Trabalhar com múltiplos operadores de estado no Streaming Estruturado

No Databricks Runtime 13.3 LTS ou posterior, o Azure Databricks oferece suporte avançado para operadores com estado em cargas de trabalho de streaming estruturado. Pode encadear múltiplos operadores com estado; isto significa que é possível usar a saída de uma operação, como uma agregação em janela, numa outra operação com estado, como uma junção.

No Databricks Runtime 16.2 ou posterior, pode usar transformWithState em cargas de trabalho com múltiplos operadores com estado. Consulte Criar uma aplicação com estado personalizado.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao trabalhar com múltiplos operadores de estado:

  • Operadores com estado personalizados legados (FlatMapGroupWithState e applyInPandasWithState) não são suportados.
  • Apenas o modo de saída "acrescentar" é suportado.

Agregação de janela de tempo encadeada

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

linguagem de programação Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregação por janelas temporais em dois fluxos diferentes seguida de junção de janelas entre fluxos

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

linguagem de programação Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Junção de intervalos de tempo entre fluxos seguida de agregação com janela de tempo

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

linguagem de programação Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para streaming estruturado

O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming no Lakeflow Spark Declarative Pipelines. No Databricks Runtime 11.3 LTS ou posterior, pode definir a seguinte opção de configuração na configuração do cluster Spark para permitir o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O reequilíbrio de estado beneficia os pipelines de Streaming Estruturado com estado que passam por eventos de redimensionamento de clusters. As operações de streaming sem estado não beneficiam, independentemente das mudanças no tamanho dos clusters.

Nota

O dimensionamento automático de computação tem limitações para reduzir o tamanho do cluster para cargas de trabalho de Streaming Estruturado. A Databricks recomenda o uso do Lakeflow Spark Declarative Pipelines com dimensionamento automático aprimorado para cargas de trabalho de streaming. Consulte Otimize a utilização do cluster de pipelines declarativos do Lakeflow Spark com dimensionamento automático.

Os eventos de redimensionamento de cluster acionam o reequilíbrio de estado. Os microlotes podem ter maior latência durante eventos de rebalanceamento à medida que o estado é carregado do armazenamento em nuvem para os novos executores.