Compartilhar via


O que é streaming com estado?

Uma consulta de Streaming Estruturado com estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de Streaming Estruturado sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o coletor.

As operações com estado incluem agregação de streaming, dropDuplicates de streaming, junções fluxo-fluxo, mapGroupsWithState e flatMapGroupsWithState.

As informações de estado intermediário necessárias para consultas de Streaming Estruturado com estado poderão levar a problemas inesperados de latência e produção se não estiverem configuradas corretamente.

No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog com RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de streaming estruturado. O Databricks recomenda habilitar o ponto de verificação do changelog para todas as consultas com estado de Streaming Estruturado. Confira Habilitar o ponto de verificação do log de alterações.

Otimizar consultas de Streaming Estruturado com estado

Gerenciar as informações de estado intermediário de consultas de Streaming Estruturado com estado pode ajudar a evitar problemas inesperados de latência e produção.

O Databricks recomenda:

  • Use instâncias otimizadas para computação como trabalhos.
  • De acordo com o número de partições aleatórias, de 1 a 2 vezes o número de núcleos no cluster.
  • Definir a configuração spark.sql.streaming.noDataMicroBatches.enabled como false na SparkSession. Isso impede que o mecanismo de micro lote de streaming processe micro lotes que não contêm dados. Observe também que definir essa configuração como false pode resultar em operações com estado que aproveitam marcas-d'água ou tempos-tempo de processamento para não obter saída de dados até que novos dados cheguem em vez de imediatamente.

O Databricks recomenda usar o RocksDB com o ponto de verificação do changelog para gerenciar o estado dos fluxos com estado. Confira Configurar o repositório de estado do RocksDB no Azure Databricks.

Observação

O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações da consulta. Ou seja, se uma consulta tiver sido iniciada com o gerenciamento padrão, ela não poderá ser alterada sem iniciar a consulta do zero com um novo local de ponto de verificação.

Trabalhar com vários operadores com estado no Fluxo Estruturado

No Databricks Runtime 13.3 LTS e superior, o Azure Databricks oferece suporte avançado para operadores com estado em cargas de trabalho de streaming estruturado. Agora você pode encadear vários operadores com estado, o que significa que você pode alimentar a saída de uma operação, como uma agregação em janelas para outra operação com estado, como uma junção.

Os exemplos a seguir demonstram vários padrões que você pode usar.

Importante

As seguintes limitações existem ao trabalhar com vários operadores com estado:

  • Não há suporte para FlatMapGroupWithState.
  • Há suporte apenas para o modo de saída de acréscimo.

Agregação de janela de tempo encadeada

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregação de janela de tempo em dois fluxos diferentes seguidos de junção de fluxo a fluxo de janela

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Junção do intervalo de tempo de fluxo a fluxo seguido pela agregação da janela de tempo

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rebalanceamento de estado para Streaming Estruturado

O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming no Delta Live Tables. No Databricks Runtime 11.3 LTS e superior, você pode definir a seguinte opção de configuração na configuração do cluster Spark para habilitar o reequilíbrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

O rebalanceamento de estado beneficia pipelines de Streaming Estruturado com estado que passam por eventos de redimensionamento de cluster. As operações de streaming sem estado não se beneficiam, independentemente da alteração do tamanhos do cluster.

Observação

O dimensionamento automático de computação tem limitação ao reduzir o tamanho do cluster para cargas de Fluxo Estruturado. O Databricks recomenda usar o Delta Live Tables com o Dimensionamento Automático Aprimorado para cargas de trabalho de fluxo. Consulte Otimizar a utilização do cluster de pipelines do Delta Live Tables com dimensionamento automático aprimorado.

Os eventos de redimensionamento de cluster fazem com que o rebalanceamento de estado seja disparado. Durante os eventos de rebalanceamento, os microlotes podem ter uma latência maior, pois o estado é carregado a partir do armazenamento em nuvem para os novos executores.

Especificar o estado inicial do mapGroupsWithState

É possível especificar um estado inicial definido pelo usuário para processamentos com estado do Fluxo Estruturado usando flatMapGroupsWithState ou mapGroupsWithState. Isso permite que você evite reprocessar dados ao iniciar um fluxo com estado sem um ponto de verificação válido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exemplo de caso de uso que especifica um estado inicial para o operador flatMapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exemplo de caso de uso que especifica um estado inicial para o operador mapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testar a mapGroupsWithState função de atualização

A API TestGroupState permite que você teste a função de atualização de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...) e Dataset.groupByKey(...).flatMapGroupsWithState(...).

A função de atualização de estado usa o estado anterior como entrada usando um objeto do tipo GroupState. Consulte a documentação referência de GroupState do Apache Spark. Por exemplo:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}