Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Importante
Esta característica está en versión preliminar pública en Databricks Runtime 16.2 y versiones posteriores.
Puede crear aplicaciones de streaming mediante operadores con estado personalizados para implementar soluciones de baja latencia y casi en tiempo real que usan lógica con estado arbitraria. Los operadores con estado personalizado desbloquean nuevos casos de uso operativos y patrones no disponibles a través del procesamiento tradicional de Structured Streaming.
Nota:
Databricks recomienda usar la funcionalidad de Structured Streaming integrada para operaciones con estado admitidas, como agregaciones, desduplicación y combinaciones de streaming. Consulte ¿Qué es el streaming con estado?.
Databricks recomienda usar transformWithState sobre operadores heredados para transformaciones de estado arbitrarias. Para obtener documentación sobre los operadores heredados flatMapGroupsWithState y mapGroupsWithState, consulte Operadores con estado arbitrarios heredados.
Requisitos
El operador transformWithState y las API y clases relacionadas tienen los siguientes requisitos:
- Disponible en Databricks Runtime 16.2 y versiones posteriores.
- La computación debe usar el modo de acceso dedicado o sin aislamiento, excepto que el modo de acceso estándar sea compatible para Python (
transformWithStateInPandas) en Databricks Runtime 16.3 y versiones posteriores, y para Scala (transformWithState) en Databricks Runtime 17.3 y versiones posteriores. - Debe usar el proveedor del almacén de estado de RocksDB. Databricks recomienda habilitar RocksDB como parte de la configuración de proceso.
Nota:
Para habilitar el proveedor de almacén de estado de RocksDB para la sesión actual, ejecute lo siguiente:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
¿Qué es transformWithState?
El transformWithState operador aplica un procesador con estado personalizado a una consulta de streaming estructurado. Debe implementar un procesador con estado personalizado para usar transformWithState. Structured Streaming incluye APIs para desarrollar tu procesador de estado utilizando Python, Scala o Java.
Se usa transformWithState para aplicar lógica personalizada a una clave de agrupación para los registros procesados incrementalmente con Structured Streaming. A continuación se describe el diseño de alto nivel:
- Defina una o varias variables de estado.
- La información de estado se conserva para cada clave de agrupación y se puede acceder a ella para cada variable de estado según la lógica definida por el usuario.
- Para cada microlote procesado, todos los registros de la clave están disponibles como iterador.
- Utiliza manejadores incorporados para controlar cuándo y cómo se emiten los registros en función de los temporizadores y las condiciones definidas por el usuario.
- Los valores de estado admiten definiciones individuales de período de vida (TTL), lo que permite flexibilidad para administrar la expiración del estado y el tamaño del estado.
Dado que transformWithState admite la evolución del esquema en el almacén de estado, puede iterar y actualizar las aplicaciones de producción sin perder información de estado histórico ni necesitar volver a procesar registros, lo que proporciona flexibilidad para el desarrollo y la facilidad de mantenimiento. Consulte Evolución del esquema en el almacén de estados.
Importante
PySpark usa el operador transformWithStateInPandas en lugar de transformWithState. La documentación de Azure Databricks usa transformWithState para describir la funcionalidad de las implementaciones de Python y Scala.
Las implementaciones de Scala y Python de transformWithState y las API relacionadas difieren debido a los detalles del lenguaje, pero proporcionan la misma funcionalidad. Consulte ejemplos específicos del lenguaje de programación y documentación de API para su lenguaje de programación preferido.
Gestión integrada de procesamiento
Implemente la lógica principal de su aplicación personalizada con estado mediante el uso de controladores utilizando manejadores integrados.
- Los manejadores proporcionan los métodos para manejar valores de estado y temporizadores, procesar registros entrantes y emitir registros.
- Los manejadores definen tu lógica personalizada basada en eventos.
Los manejadores de cada tipo de estado se implementan según la estructura de datos subyacente, pero cada uno contiene funcionalidad para obtener, insertar, actualizar y eliminar registros.
Los controladores se implementan en función de los eventos observados en registros de entrada o temporizadores mediante la semántica siguiente:
- Defina un controlador mediante el
handleInputRowsmétodo para controlar cómo se procesan los datos, se actualiza el estado y se emiten registros para cada microlote de registros procesados para la clave de agrupación. Consulte Controlar filas de entrada. - Defina un controlador mediante el
handleExpiredTimermétodo para usar umbrales basados en tiempo para ejecutar lógica si se procesan o no registros adicionales para la clave de agrupación. Consulte Eventos programados.
En la tabla siguiente se incluye una comparación de los comportamientos funcionales admitidos por estos controladores:
| Comportamiento | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Obtener, colocar, actualizar o borrar valores de estado | Sí | Sí |
| Creación o eliminación de un temporizador | Sí | Sí |
| Emitir registros | Sí | Sí |
| Iteración de registros en el micro lote actual | Sí | No |
| Activar la lógica basada en el tiempo transcurrido | No | Sí |
Puede combinar handleInputRows e handleExpiredTimer implementar lógica compleja según sea necesario.
Por ejemplo, podría implementar una aplicación que use handleInputRows para actualizar los valores de estado de cada microlote y establecer un temporizador de 10 segundos en el futuro. Si no se procesan registros adicionales, puede usar handleExpiredTimer para emitir los valores actuales en el almacén de estado. Si se procesan nuevos registros para la clave de agrupación, puede borrar el temporizador existente y establecer un nuevo temporizador.
Tipos de estado personalizados
Puede implementar varios objetos de estado en un único operador con estado. Los nombres que asigne a cada objeto de estado persisten en el almacén de estado, al que puede acceder con el lector del almacén de estado. Si el objeto de estado usa un StructType, se proporcionan nombres para cada campo de la estructura mientras se pasa el esquema. Estos nombres también son visibles al leer el almacén de estado. Consulte Leer información de estado de Structured Streaming.
La funcionalidad proporcionada por clases y operadores integrados está pensada para proporcionar flexibilidad y extensibilidad, y las opciones de implementación deben ser informadas por la lógica completa que la aplicación debe ejecutar. Por ejemplo, puede implementar una lógica casi idéntica mediante un ValueState agrupado por campos user_id o session_id un MapState agrupado por user_id donde session_id es la clave de MapState. En este caso, MapState podría ser la implementación preferida si la lógica necesita evaluar las condiciones entre varios session_ids.
En las secciones siguientes se describen los tipos de estado admitidos por transformWithState.
ValueState
Para cada clave de agrupación, hay un valor asociado.
Un estado de valor puede incluir tipos complejos, como una estructura o una tupla. Al actualizar un ValueState, se implementa lógica para reemplazar todo el valor. El TTL de un estado de valor se restablece cuando el valor se actualiza, pero no se restablece si se procesa una clave de origen que coincide con un ValueState sin actualizar el almacenado ValueState.
ListState
Para cada clave de agrupación, hay una lista asociada.
Un estado de lista es una colección de valores, cada uno de los cuales puede incluir tipos complejos. Cada valor de una lista tiene su propio TTL. Puede agregar elementos a una lista anexando elementos individuales, anexando una lista de elementos o sobrescribiendo toda la lista con .put Solo la operación put se considera una actualización para restablecer el TTL.
EstadoDelMapa
Para cada clave de agrupación, hay un mapa asociado. Los mapas son el equivalente funcional de Apache Spark a un dict de Python.
Importante
Las claves de agrupación describen los campos especificados en la cláusula de consulta GROUP BY Structured Streaming. Los estados de mapa contienen un número arbitrario de pares clave-valor para una clave de agrupación.
Por ejemplo, si agrupa por user_id y desea definir un mapa para cada session_id, la clave de agrupación es user_id y la clave del mapa es session_id.
Un estado de mapa es una colección de claves distintas que cada una asigna a un valor que puede incluir tipos complejos. Cada par clave-valor de un mapa tiene su propio TTL. Puede actualizar el valor de una clave específica o quitar una clave y su valor. Puede devolver un valor individual mediante su clave, enumerar todas las claves, enumerar todos los valores o devolver un iterador para trabajar con el conjunto completo de pares clave-valor en el mapa.
Inicialización de una variable de estado personalizada
Al inicializar StatefulProcessor, se crea una variable local para cada objeto de estado que permite interactuar con objetos de estado en la lógica personalizada. Las variables de estado se definen e inicializan sobrescribiendo el método integrado init en la clase StatefulProcessor.
Defina una cantidad arbitraria de objetos de estado mediante los getValueStatemétodos , getListStatey getMapState al inicializar StatefulProcessor.
Cada objeto de estado debe tener lo siguiente:
- Un nombre único
- Esquema especificado
- En Python, el esquema se especifica explícitamente.
- En Scala, pase un
Encoderpara especificar el esquema de estado.
También puede proporcionar una duración opcional de período de vida (TTL) en milisegundos. Si implementa un estado de mapa, debe proporcionar una definición de esquema independiente para las claves de mapa y los valores.
Nota:
La lógica sobre cómo se consulta, actualiza y emite la información de estado se gestiona por separado. Consulte Uso de las variables de estado.
Aplicación con estado de ejemplo
A continuación se muestra la sintaxis básica para definir y usar un procesador con estado personalizado con transformWithState, incluidas las variables de estado de ejemplo para cada tipo admitido. Para obtener más ejemplos, vea Ejemplo de aplicaciones con estado.
Nota:
Python usa tuplas para todas las interacciones con valores de estado. Esto significa que el código de Python debe pasar valores mediante tuplas al usar operaciones como put y update, y esperar manejar las tuplas al usar get.
Por ejemplo, si el esquema del estado de valor es solo un entero único, implementaría código como el siguiente:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Esto también es cierto para los elementos en un ListState o los valores en un MapState.
Pitón
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
PySpark incluye la StatefulProcessorHandle clase para proporcionar acceso a funciones que controlan cómo interactúa el código de Python definido por el usuario con la información de estado. Siempre debe importar y pasar StatefulProcessorHandle a la variable handle cuando se inicializa un StatefulProcessor.
La handle variable vincula la variable local de la clase de Python a la variable de estado.
Nota:
Scala usa el getHandle método .
Especificar el estado inicial
Opcionalmente, puede proporcionar un estado inicial para usarlo con el primer micro-lote. Esto puede ser útil al migrar un flujo de trabajo existente a una nueva aplicación personalizada, actualizar un operador con estado para cambiar el esquema o la lógica, o reparar un error que no se puede reparar automáticamente y requiere intervención manual.
Nota:
Use el lector del almacén de estado para consultar la información de estado de un punto de control existente. Consulte Leer información de estado de Structured Streaming.
Si va a convertir una tabla Delta existente en una aplicación con estado, lea la tabla mediante spark.read.table("table_name") y pase el dataframe resultante. Puede elegir opcionalmente seleccionar o modificar campos para que se ajusten a su nueva aplicación de estado.
Proporcione un estado inicial mediante un DataFrame con el mismo esquema de clave de agrupación que las filas de entrada.
Nota:
Python usa handleInitialState para especificar el estado inicial al definir un StatefulProcessor. Scala usa la clase distinta StatefulProcessorWithInitialState.
Usa tus variables de estado
Los objetos de estado admitidos proporcionan métodos para obtener el estado, actualizar la información de estado existente o borrar el estado actual. Cada tipo de estado admitido tiene una implementación única de métodos que corresponden a la estructura de datos implementada.
Cada clave de agrupación observada tiene información de estado dedicada.
- Los registros se generan según la lógica que implemente y el esquema de salida que especifique. Vea Emisión de registros.
- Puede acceder a los valores en el almacenamiento de estado usando el lector
statestore. Este lector tiene funcionalidad por lotes y no está pensado para cargas de trabajo de baja latencia. Consulte Leer información de estado de Structured Streaming. - La lógica especificada con
handleInputRowssolo se activa si los registros de la clave están presentes en un microproceso. Consulte Controlar filas de entrada. - Use
handleExpiredTimerpara implementar lógica basada en el tiempo que no depende de observar registros para activarse. Consulte Eventos programados.
Nota:
Los objetos de estado están aislados mediante la agrupación de claves con las siguientes implicaciones:
- Los valores de estado no se pueden ver afectados por los registros asociados a una clave de agrupación diferente.
- No se puede implementar lógica que dependa de comparar valores o actualizar el estado entre claves de agrupación.
Puede comparar valores dentro de una clave de agrupación. Usa un MapState para implementar lógica con una segunda clave que tu lógica personalizada pueda usar. Por ejemplo, agrupar por user_id y usar la dirección IP para identificar su MapState le permitiría implementar la lógica que rastrea las sesiones simultáneas de usuario.
Consideraciones avanzadas para trabajar con el estado
Modificar una variable de estado desencadena una escritura en RocksDB. Para optimizar el rendimiento, Databricks recomienda procesar todos los valores del iterador para una clave determinada y confirmar actualizaciones en una sola escritura siempre que sea posible.
Nota:
Las actualizaciones de estado son tolerantes a errores. Si una tarea se bloquea antes de que un micro-lote haya terminado de procesarse, el valor del último micro-lote exitoso se usa en el reintento.
Los valores de estado no tienen valores predeterminados integrados. Si su lógica requiere leer información de estado existente, use el método exists al implementar su lógica.
Nota:
MapState las variables tienen funcionalidad adicional para comprobar si hay claves individuales o enumerar todas las claves para implementar lógica para el estado NULL.
Emitir registros
La lógica definida por el usuario controla cómo transformWithState emite registros. Los registros se emiten por clave de agrupación.
Las aplicaciones con estado personalizado no asumen cómo se usa la información de estado al determinar cómo emitir registros y el número devuelto de registros para una condición determinada puede ser ninguno, uno o varios.
Implemente lógica para emitir registros mediante handleInputRows o handleExpiredTimer. Consulte Manejo de filas de entrada y Programar eventos temporizados.
Nota:
Puede implementar varios valores de estado y definir varias condiciones para emitir registros, pero todos los registros emitidos deben usar el mismo esquema.
Pitón
En Python, definirá el esquema de salida mediante la outputStructType palabra clave al llamar a transformWithStateInPandas.
Se emiten registros mediante un objeto Pandas DataFrame y yield.
Opcionalmente yield , puede ser un DataFrame vacío. Cuando se combina con el update modo de salida, la emisión de un dataframe vacío actualiza los valores de la clave de agrupación para que sea NULL.
Scala
En Scala, se emiten registros mediante un Iterator objeto . El esquema de la salida se deriva de los registros emitidos.
Puede generar opcionalmente un Iterator vacío. Cuando se combina con el update modo de salida, la emisión de un valor vacío Iterator actualiza los valores de la clave de agrupación para que sea NULL.
Controlar filas de entrada
Use el handleInputRows método para definir la lógica de cómo interactúan los registros observados en la consulta de streaming y actualizan los valores de estado. El controlador que defina con el handleInputRows método se ejecuta cada vez que se procesan los registros a través de la consulta Structured Streaming.
Para la mayoría de las aplicaciones con estado implementadas con transformWithState, la lógica principal se define mediante handleInputRows.
Para cada actualización por lotes procesada, todos los registros del microlote para una clave de agrupación determinada están disponibles mediante un iterador. La lógica definida por el usuario puede interactuar con todos los registros del microbatch actual y los valores del almacén de estado.
Programar eventos temporizados
Puede usar temporizadores para implementar lógica personalizada basada en el tiempo transcurrido desde una condición especificada.
Trabaja con temporizadores implementando el método handleExpiredTimer.
Dentro de una clave de agrupación, los temporizadores se identifican de forma única mediante su marca de tiempo.
Cuando expira un temporizador, el resultado viene determinado por la lógica implementada en la aplicación. Entre los patrones comunes se incluyen:
- Emitir información almacenada en una variable de estado.
- Eliminar la información de estado almacenada.
- Creación de un nuevo temporizador.
Los temporizadores expirados se activan incluso si no se procesan registros para su clave asociada en un microproceso.
Especificar el modelo de hora
Al pasar su StatefulProcessor a transformWithState, debe especificar el modelo de tiempo. Se admiten las siguientes opciones:
ProcessingTimeEventTime-
NoTimeoTimeMode.None()
Especificar NoTime significa que no se admiten temporizadores para el procesador.
Valores de temporizador integrados
Databricks recomienda no invocar el reloj del sistema en su aplicación personalizada con estado, ya que esto puede conducir a reintentos no fiables en caso de fallos en tareas. Use los métodos de la TimerValues clase cuando deba tener acceso al tiempo de procesamiento o a la marca de agua:
TimerValues |
Descripción |
|---|---|
getCurrentProcessingTimeInMs |
Devuelve la marca de tiempo del tiempo de procesamiento del lote actual en milisegundos desde la época. |
getCurrentWatermarkInMs |
Devuelve la marca de tiempo del indicador temporal del lote actual en milisegundos desde el comienzo de la época. |
Nota:
El tiempo de procesamiento describe el tiempo en que el micro-lote es procesado por Apache Spark. Muchos orígenes de streaming, como Kafka, también incluyen tiempo de procesamiento del sistema.
Las marcas de agua en las consultas de streaming suelen definirse respecto al tiempo de evento o en el tiempo de procesamiento de la fuente de streaming. Consulte Aplicar marcas de agua para controlar los umbrales de procesamiento de datos.
Tanto las marcas de agua como las ventanas se pueden usar en combinación con transformWithState. Puede implementar una funcionalidad similar en su aplicación con estado personalizada utilizando TTL, temporizadores y funcionalidades de MapState o ListState.
¿Qué es el tiempo de vida del estado (TTL)?
Los valores de estado usados por transformWithState cada uno admiten una especificación opcional de período de vida (TTL). Cuando el TTL expira, el valor se elimina del almacén de estado. TTL solo interactúa con los valores en el almacén de estado, lo que significa que puede implementar lógica para eliminar la información de estado, pero no puede desencadenar directamente la lógica cuando TTL elimina los valores de estado.
Importante
Si no implementa TTL, debe gestionar la expulsión de estado mediante otra lógica para evitar el crecimiento de estado sin fin.
TTL se aplica para cada valor de estado, con reglas diferentes para cada tipo de estado.
- Las variables de estado se limitan a las claves de agrupación.
- En el caso
ValueStatede los objetos, solo se almacena un valor único por clave de agrupación. TTL se aplica a este valor. - En
ListStateel caso de los objetos, la lista puede contener muchos valores. TTL se aplica a cada valor de una lista de forma independiente. - Para
MapStateobjetos, cada clave de asignación tiene un valor de estado asociado. TTL se aplica de forma independiente a cada par clave-valor de un mapa.
Para todos los tipos de estado, TTL se restablece si se actualiza la información de estado.
Nota:
Aunque TTL se limita a valores individuales de ListState, la única manera de actualizar un valor de una lista es usar el método put para sobrescribir todo el contenido de la variable ListState.
¿Cuál es la diferencia entre temporizadores y TTL?
Hay cierta superposición entre temporizadores y tiempo de vida (TTL) para las variables de estado, pero los temporizadores proporcionan un conjunto más amplio de características que TTL.
TTL expulsa la información de estado que no se ha actualizado durante el período especificado por el usuario. Esto permite a los usuarios evitar el crecimiento del estado descontrolado y quitar entradas de estado obsoletas. Dado que los mapas y las listas implementan TTL para cada valor, puede implementar funciones que solo tengan en cuenta los valores de estado que se han actualizado recientemente estableciendo TTL.
Los temporizadores permiten definir lógica personalizada más allá de la expulsión de estado, incluida la emisión de registros. Opcionalmente, puede usar temporizadores para borrar la información de estado de un valor de estado determinado, con la flexibilidad adicional para emitir valores o desencadenar otra lógica condicional basada en el temporizador.