Compartir a través de


¿Qué es el streaming con estado?

Las consultas de Structured Streaming con estado requieren que se realicen actualizaciones incrementales de la información de estado intermedio. En cambio, las consultas de Structured Streaming sin estado solo realizan un seguimiento de las filas que se han procesado desde el origen al receptor.

Entre las operaciones con estado se incluyen la agregación de streaming, el streaming con el método dropDuplicates, las combinaciones de flujos de datos y los operadores mapGroupsWithState y flatMapGroupsWithState.

La información de estado intermedio que se necesita para las consultas de Structured Streaming con estado puede provocar problemas inesperados de latencia y de producción si no se configura correctamente.

En Databricks Runtime 13.3 LTS y versiones posteriores, puede habilitar los puntos de control del registro de cambios con RocksDB para reducir la duración del punto de control y la latencia de un extremo a otro para cargas de trabajo de flujo estructurado. Databricks recomienda habilitar los puntos de control del registro de cambios para todas las consultas con estado de Structured Streaming. Consulte Habilitar puntos de control del registro de cambios.

Optimización de las consultas de Structured Streaming con estado

Administrar la información de estado intermedio de las consultas de Structured Streaming con estado puede ayudarle a evitar problemas inesperados de latencia y producción.

Procedimientos recomendados para Databricks:

  • Use instancias optimizadas para proceso como roles de trabajo.
  • Establezca el número de particiones aleatorias en 1-2 veces el número de núcleos del clúster.
  • Establezca la configuración de spark.sql.streaming.noDataMicroBatches.enabled en false en SparkSession. Esto evita que el motor de microlotes de streaming procese microlotes que no contienen datos. Tenga también en cuenta que establecer esta configuración en false podría provocar operaciones con estado que aprovechan las marcas de agua o los tiempos de espera de procesamiento para no obtener la salida de datos hasta que llegan nuevos datos, en lugar de inmediatamente.

Databricks recomienda usar RocksDB con puntos de comprobación del registro de cambios para administrar el estado de las secuencias con estado. Consulte Configuración de almacenes de estados de RocksDB en Azure Databricks.

Nota:

El esquema de administración de estado no se puede cambiar de un reinicio de consulta al siguiente. Es decir, si una consulta se ha iniciado con la administración predeterminada, no se puede cambiar sin iniciar la consulta desde cero con una nueva ubicación de punto de comprobación.

Trabajar con múltiples operadores de estado en Structured Streaming

En Databricks Runtime 13.3 LTS y versiones posteriores, Azure Databricks ofrece soporte avanzado para operadores con estado en cargas de trabajo de Structured Streaming. Ahora puede encadenar varios operadores con estado, lo que significa que puede alimentar la salida de una operación, como una agregación en ventana, a otra operación con estado, como una unión.

Los siguientes ejemplos muestran varios patrones que puede utilizar.

Importante

Existen las siguientes limitaciones cuando se trabaja con múltiples operadores con estado:

  • No se admite FlatMapGroupWithState.
  • Solo se admite el modo de salida append.

Agregación de ventanas de tiempo encadenadas

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregación de intervalos de tiempo en dos flujos diferentes seguida de una unión de intervalos de tiempo flujo-flujo

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Combinación de intervalos de tiempo flujo-flujo seguida de agregación de ventanas de tiempo

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Reequilibrio de estado para el flujo estructurado

El reequilibrio de estado está habilitado de forma predeterminada para todas las cargas de trabajo de streaming en Delta Live Tables. En Databricks Runtime 11.3 LTS y versiones posteriores, puede establecer la siguiente opción de configuración en la configuración del clúster de Spark para habilitar el reequilibrio de estado:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

El reequilibrio de estado beneficia a las canalizaciones de Structured Streaming con estado que se someten a eventos de cambio de tamaño del clúster. Las operaciones de streaming sin estado no se benefician, independientemente de cambiar los tamaños del clúster.

Nota:

El escalado automático de proceso tiene limitaciones al reducir verticalmente el tamaño del clúster para cargas de trabajo de Structured Streaming. Databricks recomienda usar tablas Delta Live con escalado automático mejorado para cargas de trabajo de streaming. Consulte Optimización del uso del clúster de canalizaciones de Delta Live Tables con escalado automático mejorado.

Un evento de cambio de tamaño de clúster hará que se desencadene el reequilibrio de estado. Durante los eventos de reequilibrio, es posible que un microlote pueda tener una latencia mayor a medida que el estado se cargue desde el almacenamiento en la nube a los nuevos ejecutores.

Especificación del estado inicial para mapGroupsWithState

Puede especificar un estado inicial definido por el usuario para el procesamiento con estado de Structured Streaming mediante flatMapGroupsWithState o mapGroupsWithState. Esto le permite evitar el reprocesamiento de datos al iniciar una secuencia con estado sin un punto de comprobación válido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Caso de uso de ejemplo que especifica un estado inicial para el operador flatMapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Caso de uso de ejemplo que especifica un estado inicial para el operador mapGroupsWithState:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Probar la función de actualización mapGroupsWithState

TestGroupState API permite probar la función de actualización de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...) y Dataset.groupByKey(...).flatMapGroupsWithState(...).

La función de actualización de estado toma el estado anterior como entrada mediante un objeto del tipo GroupState. Consulte la documentación de referencia de GroupState de Apache Spark. Por ejemplo:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}