Nota
L'accés a aquesta pàgina requereix autorització. Pots provar d'iniciar sessió o canviar de directori.
L'accés a aquesta pàgina requereix autorització. Pots provar de canviar directoris.
Este artículo contiene ejemplos de código para aplicaciones con estado personalizadas. Databricks recomienda usar métodos con estado integrados para operaciones comunes, como agregaciones y combinaciones.
Los patrones de este artículo usan el operador transformWithState y las clases asociadas disponibles en versión preliminar pública en Databricks Runtime 16.2 y versiones posteriores. Consulte Compilación de una aplicación con estado personalizada.
Nota
Python usa el operador transformWithStateInPandas para proporcionar la misma funcionalidad. Los ejemplos siguientes proporcionan código en Python y Scala.
Requisitos
El operador transformWithState y las API y clases relacionadas tienen los siguientes requisitos:
- Disponible en Databricks Runtime 16.2 y versiones posteriores.
- Los recursos de cómputo deben usar el modo de acceso dedicado o sin aislamiento.
- Debe usar el proveedor del almacén de estado de RocksDB. Databricks recomienda habilitar RocksDB como parte de la configuración de proceso.
-
transformWithStateInPandasadmite el modo de acceso estándar en Databricks Runtime 16.3 y versiones posteriores.
Nota
Para habilitar el proveedor de almacén de estado de RocksDB para la sesión actual, ejecute lo siguiente:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Dimensión de variación lenta (SCD) de tipo 1
El código siguiente es un ejemplo de implementación del tipo SCD 1 mediante transformWithState. El tipo SCD 1 solo realiza un seguimiento del valor más reciente de un campo determinado.
Nota
Puede usar tablas de streaming y AUTO CDC ... INTO para implementar el tipo 1 o tipo 2 de SCD mediante tablas respaldadas por Delta Lake. En este ejemplo se implementa el tipo 1 de SCD en el almacén de estado, que proporciona una menor latencia para aplicaciones casi en tiempo real.
Pitón
# Import the necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType
from typing import Iterator
# Set the state store provider to RocksDB
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
# Define the output schema for the streaming query
output_schema = StructType([
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])
# Define a custom StatefulProcessor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.handle = handle
# Define the schema for the state value
value_state_schema = StructType([...])
StructField("user", StringType(), True),
StructField("time", LongType(), True),
StructField("location", StringType(), True)
])
# Initialize the state to store the latest location for each user
self.latest_location = handle.getValueState("latestLocation", value_state_schema)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Find the row with the maximum time value
max_row = None
max_time = float('-inf')
for pdf in rows:
for _, pd_row in pdf.iterrows():
time_value = pd_row["time"]
if time_value > max_time:
max_time = time_value
max_row = tuple(pd_row)
# Check whether state exists and update if necessary
exists = self.latest_location.exists()
if not exists or max_row[1] > self.latest_location.get()[1]:
# Update the state with the new max row
self.latest_location.update(max_row)
# Yield the updated row
yield pd.DataFrame(
{"user": (max_row[0],), "time": (max_row[1],), "location": (max_row[2],)}
)
# Yield an empty DataFrame if no update is needed
yield pd.DataFrame()
def close(self) -> None:
# No cleanup needed
pass
# Apply the stateful transformation to the input DataFrame
(df.groupBy("user")
.transformWithStateInPandas(
statefulProcessor=SCDType1StatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream... # Continue with stream writing configuration
)
Scala
// Define a case class to represent user location data
case class UserLocation(
user: String,
time: Long,
location: String)
// Define a stateful processor for slowly changing dimension type 1 (SCD1) operations
class SCDType1StatefulProcessor extends StatefulProcessor[String, UserLocation, UserLocation] {
import org.apache.spark.sql.{Encoders}
// Transient value state to store the latest location for each user
@transient private var _latestLocation: ValueState[UserLocation] = _
// Initialize the state store
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
// Create a value state named "locationState" using UserLocation encoder
// TTLConfig.NONE means the state has no expiration
_latestLocation = getHandle.getValueState[UserLocation]("locationState",
Encoders.product[UserLocation], TTLConfig.NONE)
}
// Process input rows and update state
override def handleInputRows(
key: String,
inputRows: Iterator[UserLocation],
timerValues: TimerValues): Iterator[UserLocation] = {
// Find the location with the maximum timestamp from input rows
val maxNewLocation = inputRows.maxBy(_.time)
// Update state and emit output if:
// 1. No previous state exists, or
// 2. New location has a more recent timestamp than the stored one
if (_latestLocation.getOption().isEmpty || maxNewLocation.time > _latestLocation.get().time) {
_latestLocation.update(maxNewLocation)
Iterator.single(maxNewLocation) // Emit the updated location
} else {
Iterator.empty // No update needed, emit nothing
}
}
}
}
Dimensión de variación lenta tipo 2 (SCD)
Los cuadernos siguientes contienen un ejemplo de implementación del tipo 2 de SCD mediante transformWithState en Python o Scala.
SCD Type 2 Python
SCD tipo 2 Scala
Detector de tiempo de inactividad
transformWithState implementa temporizadores para permitirle tomar medidas en función del tiempo transcurrido, incluso si no se procesan registros para una clave determinada en un microproceso.
En el ejemplo siguiente se implementa un patrón para un detector de tiempo de inactividad. Cada vez que se ve un nuevo valor para una clave determinada, actualiza el valor de estado lastSeen, borra los temporizadores existentes y restablece un temporizador para el futuro.
Cuando expira un temporizador, la aplicación emite el tiempo transcurrido desde el último evento observado para la clave. A continuación, establece un nuevo temporizador para emitir una actualización de 10 segundos más tarde.
Pitón
import datetime
import time
class DownTimeDetectorStatefulProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define the schema for the state value (timestamp)
state_schema = StructType([StructField("value", TimestampType(), True)])
self.handle = handle
# Initialize state to store the last seen timestamp for each key
self.last_seen = handle.getValueState("last_seen", state_schema)
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
latest_from_existing = self.last_seen.get()
# Calculate downtime duration
downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(time.time() * 1000)
# Register a new timer for 10 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
# Yield a DataFrame with the key and downtime duration
yield pd.DataFrame(
{
"id": key,
"timeValues": str(downtime_duration),
}
)
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Find the row with the maximum timestamp
max_row = max((tuple(pdf.iloc[0]) for pdf in rows), key=lambda row: row[1])
# Get the latest timestamp from the existing state or use epoch start if a timestamp doesn't exist
if self.last_seen.exists():
latest_from_existing = self.last_seen.get()
else:
latest_from_existing = datetime.fromtimestamp(0)
# If the new data is more recent than the existing state
if latest_from_existing < max_row[1]:
# Delete all existing timers
for timer in self.handle.listTimers():
self.handle.deleteTimer(timer)
# Update the last seen timestamp
self.last_seen.update((max_row[1],))
# Register a new timer for 5 seconds in the future
self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 5000)
# Get current processing time in milliseconds
timestamp_in_millis = str(timerValues.getCurrentProcessingTimeInMs())
# Yield a DataFrame with the key and current timestamp
yield pd.DataFrame({"id": key, "timeValues": timestamp_in_millis})
def close(self) -> None:
# No cleanup needed
pass
Scala
import java.sql.Timestamp
import org.apache.spark.sql.Encoders
// The (String, Timestamp) schema represents an (id, time). We want to do downtime
// detection on every single unique sensor, where each sensor has a sensor ID.
class DowntimeDetector(duration: Duration) extends
StatefulProcessor[String, (String, Timestamp), (String, Duration)] {
@transient private var _lastSeen: ValueState[Timestamp] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
_lastSeen = getHandle.getValueState[Timestamp]("lastSeen", Encoders.TIMESTAMP, TTLConfig.NONE)
}
// The logic here is as follows: find the largest timestamp seen so far. Set a timer for
// the duration later.
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Timestamp)],
timerValues: TimerValues): Iterator[(String, Duration)] = {
val latestRecordFromNewRows = inputRows.maxBy(_._2.getTime)
// Use getOrElse to initiate state variable if it doesn't exist
val latestTimestampFromExistingRows = _lastSeen.getOption().getOrElse(new Timestamp(0))
val latestTimestampFromNewRows = latestRecordFromNewRows._2
if (latestTimestampFromNewRows.after(latestTimestampFromExistingRows)) {
// Cancel the one existing timer, since we have a new latest timestamp.
// We call "listTimers()" because we don't know ahead of time what
// the timestamp of the existing timer will be.
getHandle.listTimers().foreach(timer => getHandle.deleteTimer(timer))
_lastSeen.update(latestTimestampFromNewRows)
// Use timerValues to schedule a timer using processing time.
getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + duration.toMillis)
} else {
// No new latest timestamp, so there is no need to update the state or set a timer.
}
Iterator.empty
}
override def handleExpiredTimer(
key: String,
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, Duration)] = {
val latestTimestamp = _lastSeen.get()
val downtimeDuration = new Duration(
timerValues.getCurrentProcessingTimeInMs() - latestTimestamp.getTime)
// Register another timer that will fire in 10 seconds.
// Timers can be registered anywhere but init()
getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
Iterator((key, downtimeDuration))
}
}
Migración de la información de estado existente
En el ejemplo siguiente se muestra cómo implementar una aplicación con estado que acepta un estado inicial. Puede agregar el control de estado inicial a cualquier aplicación con estado, pero el estado inicial solo se puede establecer al inicializar la aplicación por primera vez.
En este ejemplo se utiliza el lector statestore para cargar la información de estado existente desde una ruta de punto de control. Un caso de uso de ejemplo para este patrón es migrar desde aplicaciones con estado heredadas a transformWithState.
Pitón
# Import the necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType
from typing import Iterator
# Set RocksDB as the state store provider for better performance
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
"""
Input schema is as below
input_schema = StructType(
[StructField("id", StringType(), True)],
[StructField("value", StringType(), True)]
)
"""
# Define the output schema for the streaming query
output_schema = StructType([
StructField("id", StringType(), True),
StructField("accumulated", StringType(), True)
])
class AccumulatedCounterStatefulProcessorWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
# Define the schema for the state value (integer)
state_schema = StructType([StructField("value", IntegerType(), True)])
# Initialize state to store the accumulated counter for each id
self.counter_state = handle.getValueState("counter_state", state_schema)
self.handle = handle
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
# Check if state exists for the current key
exists = self.counter_state.exists()
if exists:
value_row = self.counter_state.get()
existing_value = value_row[0]
else:
existing_value = 0
accumulated_value = existing_value
# Process input rows and accumulate values
for pdf in rows:
value = pdf["value"].astype(int).sum()
accumulated_value += value
# Update the state with the new accumulated value
self.counter_state.update((accumulated_value,))
# Yield a DataFrame with the key and accumulated value
yield pd.DataFrame({"id": key, "accumulated": str(accumulated_value)})
def handleInitialState(self, key, initialState, timerValues) -> None:
# Initialize the state with the provided initial value
init_val = initialState.at[0, "initVal"]
self.counter_state.update((init_val,))
def close(self) -> None:
# No cleanup needed
pass
# Load initial state from a checkpoint directory
initial_state = spark.read.format("statestore")
.option("path", "$checkpointsDir")
.load()
# Apply the stateful transformation to the input DataFrame
df.groupBy("id")
.transformWithStateInPandas(
statefulProcessor=AccumulatedCounterStatefulProcessorWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream... # Continue with stream writing configuration
Scala
// Import the necessary libraries
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders, DataFrame}
import org.apache.spark.sql.types._
// Define a stateful processor that can handle the initial state
class InitialStateStatefulProcessor extends StatefulProcessorWithInitialState[String, (String, String, String), (String, String), (String, Int)] {
// Transient value state to store the accumulated value
@transient protected var valueState: ValueState[Int] = _
// Initialize the state store
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
// Create a value state named "valueState" using Int encoder
// TTLConfig.NONE means the state has no automatic expiration
valueState = getHandle.getValueState[Int]("valueState",
Encoders.scalaInt, TTLConfig.NONE)
}
// Process input rows and update state
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var existingValue = 0
// Retrieve existing value from state if it exists
if (valueState.exists()) {
existingValue += valueState.get()
}
var accumulatedValue = existingValue
// Accumulate values from input rows
for (row <- inputRows) {
accumulatedValue += row._2.toInt
}
// Update the state with the new accumulated value
valueState.update(accumulatedValue)
// Return the key and accumulated value as a string
Iterator((key, accumulatedValue.toString))
}
// Handle initial state when provided
override def handleInitialState(
key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
// Update the state with the initial value
valueState.update(initialState._2)
}
}
Migración de la tabla Delta al almacén de estado para la inicialización
Los cuadernos siguientes contienen un ejemplo de inicialización de valores de almacén de estado desde una tabla Delta mediante transformWithState en Python o Scala.
Inicialización del estado de Delta Python
Inicialización del estado de Delta Scala
Seguimiento de sesiones
Los cuadernos siguientes contienen un ejemplo de seguimiento de sesión mediante transformWithState en Python o Scala.
Seguimiento de sesiones de Python
Seguimiento de sesiones de Scala
Combinación personalizada de flujo a flujo mediante transformWithState
En el código siguiente se muestra una combinación de flujo a flujo personalizada en varias secuencias mediante transformWithState. Puede usar este enfoque en lugar de un operador de combinación integrado por los siguientes motivos:
- Debe usar el modo de salida de actualización que no admite combinaciones de flujo a flujo. Esto es especialmente útil para aplicaciones de menor latencia.
- Debe seguir realizando combinaciones para las filas de llegada tardía (después de la expiración de la marca de agua).
- Debe realizar combinaciones de flujo a flujo de varios a varios.
En este ejemplo se proporciona al usuario control total sobre la lógica de expiración de estado, lo que permite que la extensión del período de retención dinámico controle los eventos desordenados incluso después de la marca de agua.
Pitón
# Import the necessary libraries
import pandas as pd
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from typing import Iterator
# Define output schema for the joined data
output_schema = StructType([
StructField("user_id", StringType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True),
StructField("profile_name", StringType(), True),
StructField("email", StringType(), True),
StructField("preferred_category", StringType(), True)
])
class CustomStreamJoinProcessor(StatefulProcessor):
# Initialize stateful storage for user profiles, preferences, and event tracking.
def init(self, handle: StatefulProcessorHandle) -> None:
# Define schemas for different types of state data
profile_schema = StructType([
StructField("name", StringType(), True),
StructField("email", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
preferences_schema = StructType([
StructField("preferred_category", StringType(), True),
StructField("updated_at", TimestampType(), True)
])
activity_schema = StructType([
StructField("event_type", StringType(), True),
StructField("timestamp", TimestampType(), True)
])
map_state_key_schema = StructType([
StructField("user_id", StringType(), True)
])
# Initialize state storage for user profiles, preferences, and activity
self.profile_state = handle.getMapState("user_profiles", map_state_key_schema, profile_schema)
self.preferences_state = handle.getMapState("user_preferences", map_state_key_schema, preferences_schema)
self.activity_state = handle.getMapState("user_activity", map_state_key_schema, activity_schema)
# Process incoming events and update the state
def handleInputRows(self, key, rows: Iterator[pd.DataFrame], timerValues) -> Iterator[pd.DataFrame]:
df = pd.concat(rows, ignore_index=True)
output_rows = []
for _, row in df.iterrows():
user_id = row["user_id"]
if "event_type" in row: # User activity event
self.activity_state.updateValue(user_id, row.to_dict())
# Set a timer to process this event after a 10-second delay
self.handle.registerTimer(timerValues.get_current_processing_time_in_ms() + (10 * 1000))
elif "name" in row: # Profile update
self.profile_state.updateValue(user_id, row.to_dict())
elif "preferred_category" in row: # Preference update
self.preferences_state.updateValue(user_id, row.to_dict())
# No immediate output; processing will happen when the timer expires
return iter([])
# Perform lookup after delay, handling out-of-order and late-arriving events.
def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
# Retrieve stored state for the user
user_activity = self.activity_state.getValue(key)
user_profile = self.profile_state.getValue(key)
user_preferences = self.preferences_state.getValue(key)
if user_activity:
# Combine data from different states into a single output row
output_row = {
"user_id": key,
"event_type": user_activity["event_type"],
"timestamp": user_activity["timestamp"],
"profile_name": user_profile.get("name") if user_profile else None,
"email": user_profile.get("email") if user_profile else None,
"preferred_category": user_preferences.get("preferred_category") if user_preferences else None
}
return iter([pd.DataFrame([output_row])])
return iter([])
def close(self) -> None:
# No cleanup needed
pass
# Apply transformWithState to the input DataFrame
(df.groupBy("user_id")
.transformWithStateInPandas(
statefulProcessor=CustomStreamJoinProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="ProcessingTime"
)
.writeStream... # Continue with stream writing configuration
)
Scala
// Import the necessary libraries
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.TimestampType
import java.sql.Timestamp
// Define a case class for enriched user events, combining user activity with profile and preference data
case class EnrichedUserEvent(
user_id: String,
event_type: String,
timestamp: Timestamp,
profile_name: Option[String],
email: Option[String],
preferred_category: Option[String]
)
// Custom stateful processor for stream-stream join
class CustomStreamJoinProcessor extends StatefulProcessor[String, UserEvent, EnrichedUserEvent] {
// Transient state variables to store user profiles, preferences, and activities
@transient private var _profileState: MapState[String, UserProfile] = _
@transient private var _preferencesState: MapState[String, UserPreferences] = _
@transient private var _activityState: MapState[String, UserEvent] = _
// Initialize state stores
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
_profileState = getHandle.getMapState[String, UserProfile]("profileState", Encoders.product[UserProfile], TTLConfig.NONE)
_preferencesState = getHandle.getMapState[String, UserPreferences]("preferencesState", Encoders.product[UserPreferences], TTLConfig.NONE)
_activityState = getHandle.getMapState[String, UserEvent]("activityState", Encoders.product[UserEvent], TTLConfig.NONE)
}
// Handle incoming user events
override def handleInputRows(
key: String,
inputRows: Iterator[UserEvent],
timerValues: TimerValues): Iterator[EnrichedUserEvent] = {
inputRows.foreach { event =>
if (event.event_type.nonEmpty) {
// Update activity state and set a timer for 10 seconds in the future
_activityState.update(key, event)
getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
}
}
Iterator.empty
}
// Handle expired timers to produce enriched events
override def handleExpiredTimer(
key: String,
timerValues: TimerValues,
expiredTimerInfo: ExpiredTimerInfo): Iterator[EnrichedUserEvent] = {
// Retrieve user data from state stores
val userEvent = _activityState.getOption(key)
val userProfile = _profileState.getOption(key)
val userPreferences = _preferencesState.getOption(key)
if (userEvent.isDefined) {
// Create and return an enriched event if user activity exists
val enrichedEvent = EnrichedUserEvent(
user_id = key,
event_type = userEvent.get.event_type,
timestamp = userEvent.get.timestamp,
profile_name = userProfile.map(_.name),
email = userProfile.map(_.email),
preferred_category = userPreferences.map(_.preferred_category)
)
Iterator.single(enrichedEvent)
} else {
Iterator.empty
}
}
}
// Apply the custom stateful processor to the input DataFrame
val enrichedStream = df
.groupByKey(_.user_id)
.transformWithState(
new CustomStreamJoinProcessor(),
TimeMode.ProcessingTime(),
OutputMode.Append()
)
// Write the enriched stream to Delta Lake
enrichedStream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/mnt/delta/checkpoints")
.start("/mnt/delta/enriched_events")
Cálculo de top-K
En el ejemplo siguiente se usa una clase ListState con una cola de prioridad para mantener y actualizar los elementos K principales de una secuencia para cada clave de grupo casi en tiempo real.