O que é streaming com estado?
Uma consulta de Streaming Estruturado com estado requer atualizações incrementais para informações de estado intermediário, enquanto uma consulta de Streaming Estruturado sem estado rastreia apenas informações sobre quais linhas foram processadas da origem para o coletor.
As operações com estado incluem agregação de streaming, dropDuplicates
de streaming, junções fluxo-fluxo, mapGroupsWithState
e flatMapGroupsWithState
.
As informações de estado intermediário necessárias para consultas de Streaming Estruturado com estado poderão levar a problemas inesperados de latência e produção se não estiverem configuradas corretamente.
No Databricks Runtime 13.3 LTS e superior, você pode habilitar o ponto de verificação do changelog com RocksDB para reduzir a duração do ponto de verificação e a latência de ponta a ponta para cargas de trabalho de streaming estruturado. O Databricks recomenda habilitar o ponto de verificação do changelog para todas as consultas com estado de Streaming Estruturado. Confira Habilitar o ponto de verificação do log de alterações.
Otimizar consultas de Streaming Estruturado com estado
Gerenciar as informações de estado intermediário de consultas de Streaming Estruturado com estado pode ajudar a evitar problemas inesperados de latência e produção.
O Databricks recomenda:
- Use instâncias otimizadas para computação como trabalhos.
- De acordo com o número de partições aleatórias, de 1 a 2 vezes o número de núcleos no cluster.
- Definir a configuração
spark.sql.streaming.noDataMicroBatches.enabled
comofalse
na SparkSession. Isso impede que o mecanismo de micro lote de streaming processe micro lotes que não contêm dados. Observe também que definir essa configuração comofalse
pode resultar em operações com estado que aproveitam marcas-d'água ou tempos-tempo de processamento para não obter saída de dados até que novos dados cheguem em vez de imediatamente.
O Databricks recomenda usar o RocksDB com o ponto de verificação do changelog para gerenciar o estado dos fluxos com estado. Confira Configurar o repositório de estado do RocksDB no Azure Databricks.
Observação
O esquema de gerenciamento de estado não pode ser alterado entre as reinicializações da consulta. Ou seja, se uma consulta tiver sido iniciada com o gerenciamento padrão, ela não poderá ser alterada sem iniciar a consulta do zero com um novo local de ponto de verificação.
Trabalhar com vários operadores com estado no Fluxo Estruturado
No Databricks Runtime 13.3 LTS e superior, o Azure Databricks oferece suporte avançado para operadores com estado em cargas de trabalho de streaming estruturado. Agora você pode encadear vários operadores com estado, o que significa que você pode alimentar a saída de uma operação, como uma agregação em janelas para outra operação com estado, como uma junção.
Os exemplos a seguir demonstram vários padrões que você pode usar.
Importante
As seguintes limitações existem ao trabalhar com vários operadores com estado:
- Não há suporte para
FlatMapGroupWithState
. - Há suporte apenas para o modo de saída de acréscimo.
Agregação de janela de tempo encadeada
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agregação de janela de tempo em dois fluxos diferentes seguidos de junção de fluxo a fluxo de janela
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Junção do intervalo de tempo de fluxo a fluxo seguido pela agregação da janela de tempo
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Rebalanceamento de estado para Streaming Estruturado
O rebalanceamento de estado é habilitado por padrão para todas as cargas de trabalho de streaming no Delta Live Tables. No Databricks Runtime 11.3 LTS e superior, você pode definir a seguinte opção de configuração na configuração do cluster Spark para habilitar o reequilíbrio de estado:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
O rebalanceamento de estado beneficia pipelines de Streaming Estruturado com estado que passam por eventos de redimensionamento de cluster. As operações de streaming sem estado não se beneficiam, independentemente da alteração do tamanhos do cluster.
Observação
O dimensionamento automático de computação tem limitação ao reduzir o tamanho do cluster para cargas de Fluxo Estruturado. O Databricks recomenda usar o Delta Live Tables com o Dimensionamento Automático Aprimorado para cargas de trabalho de fluxo. Consulte Otimizar a utilização do cluster de pipelines do Delta Live Tables com dimensionamento automático aprimorado.
Os eventos de redimensionamento de cluster fazem com que o rebalanceamento de estado seja disparado. Durante os eventos de rebalanceamento, os microlotes podem ter uma latência maior, pois o estado é carregado a partir do armazenamento em nuvem para os novos executores.
Especificar o estado inicial do mapGroupsWithState
É possível especificar um estado inicial definido pelo usuário para processamentos com estado do Fluxo Estruturado usando flatMapGroupsWithState
ou mapGroupsWithState
. Isso permite que você evite reprocessar dados ao iniciar um fluxo com estado sem um ponto de verificação válido.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Exemplo de caso de uso que especifica um estado inicial para o operador flatMapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Exemplo de caso de uso que especifica um estado inicial para o operador mapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Testar a mapGroupsWithState
função de atualização
A API TestGroupState
permite que você teste a função de atualização de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...)
e Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
A função de atualização de estado usa o estado anterior como entrada usando um objeto do tipo GroupState
. Consulte a documentação referência de GroupState do Apache Spark. Por exemplo:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}