¿Qué es el streaming con estado?
Las consultas de Structured Streaming con estado requieren que se realicen actualizaciones incrementales de la información de estado intermedio. En cambio, las consultas de Structured Streaming sin estado solo realizan un seguimiento de las filas que se han procesado desde el origen al receptor.
Entre las operaciones con estado se incluyen la agregación de streaming, el streaming con el método dropDuplicates
, las combinaciones de flujos de datos y los operadores mapGroupsWithState
y flatMapGroupsWithState
.
La información de estado intermedio que se necesita para las consultas de Structured Streaming con estado puede provocar problemas inesperados de latencia y de producción si no se configura correctamente.
En Databricks Runtime 13.3 LTS y versiones posteriores, puede habilitar los puntos de control del registro de cambios con RocksDB para reducir la duración del punto de control y la latencia de un extremo a otro para cargas de trabajo de flujo estructurado. Databricks recomienda habilitar los puntos de control del registro de cambios para todas las consultas con estado de Structured Streaming. Consulte Habilitar puntos de control del registro de cambios.
Optimización de las consultas de Structured Streaming con estado
Administrar la información de estado intermedio de las consultas de Structured Streaming con estado puede ayudarle a evitar problemas inesperados de latencia y producción.
Procedimientos recomendados para Databricks:
- Use instancias optimizadas para proceso como roles de trabajo.
- Establezca el número de particiones aleatorias en 1-2 veces el número de núcleos del clúster.
- Establezca la configuración de
spark.sql.streaming.noDataMicroBatches.enabled
enfalse
en SparkSession. Esto evita que el motor de microlotes de streaming procese microlotes que no contienen datos. Tenga también en cuenta que establecer esta configuración enfalse
podría provocar operaciones con estado que aprovechan las marcas de agua o los tiempos de espera de procesamiento para no obtener la salida de datos hasta que llegan nuevos datos, en lugar de inmediatamente.
Databricks recomienda usar RocksDB con puntos de comprobación del registro de cambios para administrar el estado de las secuencias con estado. Consulte Configuración de almacenes de estados de RocksDB en Azure Databricks.
Nota:
El esquema de administración de estado no se puede cambiar de un reinicio de consulta al siguiente. Es decir, si una consulta se ha iniciado con la administración predeterminada, no se puede cambiar sin iniciar la consulta desde cero con una nueva ubicación de punto de comprobación.
Trabajar con múltiples operadores de estado en Structured Streaming
En Databricks Runtime 13.3 LTS y versiones posteriores, Azure Databricks ofrece soporte avanzado para operadores con estado en cargas de trabajo de Structured Streaming. Ahora puede encadenar varios operadores con estado, lo que significa que puede alimentar la salida de una operación, como una agregación en ventana, a otra operación con estado, como una unión.
Los siguientes ejemplos muestran varios patrones que puede utilizar.
Importante
Existen las siguientes limitaciones cuando se trabaja con múltiples operadores con estado:
- No se admite
FlatMapGroupWithState
. - Solo se admite el modo de salida append.
Agregación de ventanas de tiempo encadenadas
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agregación de intervalos de tiempo en dos flujos diferentes seguida de una unión de intervalos de tiempo flujo-flujo
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Combinación de intervalos de tiempo flujo-flujo seguida de agregación de ventanas de tiempo
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Reequilibrio de estado para el flujo estructurado
El reequilibrio de estado está habilitado de forma predeterminada para todas las cargas de trabajo de streaming en Delta Live Tables. En Databricks Runtime 11.3 LTS y versiones posteriores, puede establecer la siguiente opción de configuración en la configuración del clúster de Spark para habilitar el reequilibrio de estado:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
El reequilibrio de estado beneficia a las canalizaciones de Structured Streaming con estado que se someten a eventos de cambio de tamaño del clúster. Las operaciones de streaming sin estado no se benefician, independientemente de cambiar los tamaños del clúster.
Nota:
El escalado automático de proceso tiene limitaciones al reducir verticalmente el tamaño del clúster para cargas de trabajo de Structured Streaming. Databricks recomienda usar tablas Delta Live con escalado automático mejorado para cargas de trabajo de streaming. Consulte Optimización del uso del clúster de canalizaciones de Delta Live Tables con escalado automático mejorado.
Un evento de cambio de tamaño de clúster hará que se desencadene el reequilibrio de estado. Durante los eventos de reequilibrio, es posible que un microlote pueda tener una latencia mayor a medida que el estado se cargue desde el almacenamiento en la nube a los nuevos ejecutores.
Especificación del estado inicial para mapGroupsWithState
Puede especificar un estado inicial definido por el usuario para el procesamiento con estado de Structured Streaming mediante flatMapGroupsWithState
o mapGroupsWithState
. Esto le permite evitar el reprocesamiento de datos al iniciar una secuencia con estado sin un punto de comprobación válido.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Caso de uso de ejemplo que especifica un estado inicial para el operador flatMapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Caso de uso de ejemplo que especifica un estado inicial para el operador mapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Probar la función de actualización mapGroupsWithState
TestGroupState
API permite probar la función de actualización de estado usada para Dataset.groupByKey(...).mapGroupsWithState(...)
y Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
La función de actualización de estado toma el estado anterior como entrada mediante un objeto del tipo GroupState
. Consulte la documentación de referencia de GroupState de Apache Spark. Por ejemplo:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}