Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Important
Esta característica está en versión preliminar pública.
En esta página se describe el modo en tiempo real, un tipo de desencadenador para Structured Streaming que permite el procesamiento de datos de latencia ultra baja con latencia de un extremo a otro tan bajo como 5 ms. Este modo está diseñado para cargas de trabajo operativas que requieren respuesta inmediata a los datos de streaming.
El modo en tiempo real está disponible en Databricks Runtime 16.4 LTS y versiones posteriores.
Cargas de trabajo operativas
Las cargas de trabajo de streaming se pueden dividir ampliamente en cargas de trabajo analíticas y cargas de trabajo operativas:
- Las cargas de trabajo analíticas usan la ingesta y la transformación de datos, normalmente siguiendo la arquitectura Medallion (por ejemplo, ingesta de datos en las tablas de bronce, plata y oro).
- Las cargas de trabajo operativas consumen datos en tiempo real, aplican lógica de negocios y desencadenan acciones o decisiones descendentes.
Algunos ejemplos de cargas de trabajo operativas son:
- Bloquear o marcar una transacción de tarjeta de crédito en tiempo real si una puntuación de fraude supera un umbral, en función de factores como ubicación inusual, tamaño de transacción grande o patrones de gasto rápidos.
- Entregar un mensaje promocional cuando los datos de la secuencia de clics muestran que un usuario ha estado explorando jeans durante cinco minutos, ofreciendo un descuento de 25% si compra en los próximos 15 minutos.
En general, las cargas de trabajo operativas se caracterizan por la necesidad de latencia de menos de un segundo de un extremo a otro. Esto se puede lograr con el modo en tiempo real en Apache Spark Structured Streaming.
Cómo el modo en tiempo real logra una latencia baja
El modo en tiempo real mejora la arquitectura de ejecución mediante:
- Ejecutar lotes de larga duración (el valor predeterminado es 5 minutos), en el que los datos se procesan a medida que están disponibles en el origen.
- Todas las fases de la consulta se programan simultáneamente. Esto requiere que el número de espacios de tarea disponibles sea igual o mayor al número de tareas de todas las etapas en un lote.
- Los datos se pasan entre fases tan pronto como se producen usando streaming aleatorio.
Al final del procesamiento de un lote y antes de que se inicie el siguiente lote, los puntos de control de Structured Streaming progresan y hacen que las métricas del último lote esté disponible. Si los lotes son más largos, estas actividades pueden ser menos frecuentes, lo que conduce a repeticiones más largas en caso de error y retraso en la disponibilidad de las métricas. Por otro lado, si los lotes son más pequeños, estas actividades se vuelven más frecuentes, lo que podría afectar a la latencia. Databricks recomienda evaluar el modo en tiempo real frente a su carga de trabajo de destino y requisitos para encontrar el intervalo de activación adecuado.
Configuración del clúster
Para usar el modo en tiempo real en Structured Streaming, debe configurar un trabajo clásico de Lakeflow:
En el área de trabajo de Azure Databricks, haga clic en Nuevo en la esquina superior izquierda. Elija Más y haga clic en Clúster.
Eliminar aceleración de fotones.
Elimine Habilitar escalado automático.
En Rendimiento avanzado, desactivar Usar instancias de spot.
En Modo avanzado y acceso, haga clic en Manual y seleccione Dedicado (anteriormente: Usuario único).
En Spark, escriba lo siguiente en Configuración de Spark:
spark.databricks.streaming.realTimeMode.enabled trueHaga clic en Crear.
Requisitos de tamaño del clúster
Puede ejecutar un trabajo en tiempo real por clúster si el clúster tiene suficientes ranuras de tareas.
Para ejecutarse en modo de baja latencia, el número total de ranuras de tareas disponibles debe ser mayor o igual que el número de tareas en todas las fases de consulta.
Ejemplos de cálculo de ranuras
Canalización sin estado de una sola fase (fuente Kafka + receptor):
Si maxPartitions = 8, necesita al menos 8 ranuras. Si no se establece maxPartitions, use el número de particiones del tema de Kafka.
Canalización con estado en dos fases (fuente Kafka + shuffle):
Si maxPartitions = 8 y particiones aleatorias = 20, necesitará 8 + 20 = 28 ranuras.
Canalización en tres fases (fuente Kafka + aleatorización + repartición):
Con maxPartitions = 8 y dos fases de aleatorización de 20 cada una, necesita 8 + 20 + 20 = 48 ranuras.
Consideraciones clave
Al configurar el clúster, tenga en cuenta lo siguiente:
- A diferencia del modo microlote, las tareas en tiempo real pueden permanecer inactivas mientras esperan datos, por lo que el ajuste de tamaño correcto es esencial para evitar los recursos desperdiciados.
- Apuntar a un nivel objetivo de utilización (por ejemplo, 50%) mediante el ajuste:
-
maxPartitions(para Kafka) -
spark.sql.shuffle.partitions(para etapas aleatorias)
-
- Databricks recomienda establecer maxPartitions para que cada tarea controle varias particiones de Kafka para reducir la sobrecarga.
- Ajuste las ranuras de tareas por trabajador para que se ajusten a la carga de trabajo en trabajos sencillos de una sola fase.
- Para trabajos con mucha aleatoriedad, pruebe hasta encontrar el número mínimo de particiones aleatorias que evite los retrasos y ajuste a partir de ahí. El trabajo no se programará si el clúster no tiene suficientes slots.
Note
Desde Databricks Runtime 16.4 LTS y versiones posteriores, todas las canalizaciones en tiempo real usan el punto de control v2, lo que permite un cambio sin problemas entre los modos de microproceso y en tiempo real.
Configuración de consultas
Debe habilitar el desencadenador en tiempo real para especificar que una consulta debe ejecutarse mediante el modo de baja latencia. Además, los desencadenadores en tiempo real solo se admiten en modo de actualización. Por ejemplo:
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# in PySpark, realTime trigger requires you to specify the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Observability
Anteriormente, la latencia de consulta de un extremo a otro estaba estrechamente vinculada a la duración del lote, lo que hace que la duración del lote sea un buen indicador de la latencia de las consultas. Sin embargo, este método ya no se aplica en modo en tiempo real, lo que requiere enfoques alternativos para medir la latencia. La latencia de un extremo a otro es específica de la carga de trabajo y a veces solo se puede medir con precisión con la lógica de negocios. Por ejemplo, si la marca de tiempo de origen se genera en Kafka, la latencia se puede calcular como la diferencia entre la marca de tiempo de salida de Kafka y la marca de tiempo de origen.
Puede calcular la latencia de un extremo a otro de varias maneras en función de la información parcial recopilada durante el proceso de streaming.
Uso de StreamingQueryProgress
Las métricas siguientes se incluyen en el StreamingQueryProgress evento , que se registra automáticamente en los registros del controlador. También puede acceder a ellos a través de la función de devolución de llamada de StreamingQueryListener de onQueryProgress().
QueryProgressEvent.json() o toString() incluyen métricas de modo en tiempo real adicionales.
- Latencia de procesamiento (processingLatencyMs). El tiempo transcurrido entre el momento en que la consulta en modo tiempo real lee un registro y antes de que se escriba en la siguiente fase o en sentido descendente. Para las consultas de una sola fase, esto mide la misma duración que la latencia E2E. Esta métrica se notifica por tarea.
- Latencia de cola de origen (sourceQueuingLatencyMs). El tiempo transcurrido entre el momento en que un registro se escribe correctamente en un bus de mensajes, por ejemplo, el tiempo de anexión del registro en Kafka, y el momento en que el registro fue leído por primera vez por la consulta en modo de tiempo real. Esta métrica se notifica por tarea.
- Latencia de E2E (e2eLatencyMs). El tiempo transcurrido entre el momento en que el registro se escribe correctamente en un bus de mensajes y el momento en que el registro se escribe en sentido descendente mediante la consulta en modo de tiempo real. Esta métrica se agrega por lote en todos los registros procesados por todas las tareas.
Por ejemplo:
"rtmMetrics" : {
"processingLatencyMs" : {
"P0" : 0,
"P50" : 0,
"P90" : 0,
"P95" : 0,
"P99" : 0
},
"sourceQueuingLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 3
},
"e2eLatencyMs" : {
"P0" : 0,
"P50" : 1,
"P90" : 1,
"P95" : 2,
"P99" : 4
},
Uso de Observe API en trabajos
Observe API ayuda a medir la latencia sin iniciar otro trabajo. Si tiene una marca de tiempo de origen que se aproxima a la hora de llegada de los datos de origen y se pasa antes de llegar al receptor, o si puede encontrar una forma de pasar la marca de tiempo, puede estimar la latencia de cada lote usando la API de Observe:
Python
from datetime import datetime
from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType
@udf(returnType=TimestampType())
def current_timestamp():
return datetime.now()
# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
"latency",
unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
"observedLatency",
avg(col("latency")).alias("avg"),
max(col("latency")).alias("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.
Scala
import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}
val currentTimestampUDF = udf(() => System.currentTimeMillis())
// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
"latency",
col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
name = "observedLatency",
avg(col("latency")).as("avg"),
max(col("latency")).as("max"),
percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.
En este ejemplo, se registra una marca de tiempo actual antes de generar la entrada y la latencia se calcula calculando la diferencia entre esta marca de tiempo y la marca de tiempo del registro. Los resultados se incluyen en los informes de progreso y están disponibles para los oyentes. Esta es una salida de ejemplo:
"observedMetrics" : {
"observedLatency" : {
"avg" : 63.8369765176552,
"max" : 219,
"p99" : 154,
"p50" : 49
}
}
¿Qué se admite?
Environments
| Tipo de clúster | Supported |
|---|---|
| Dedicado (anteriormente: usuario único) | Yes |
| Estándar (anteriormente: compartido) | No |
| Canalizaciones Declarativas Clásicas de Lakeflow Spark | No |
| Lakeflow Spark: Canalizaciones Declarativas sin Servidor | No |
| Serverless | No |
Languages
| Language | Supported |
|---|---|
| Scala | Yes |
| Java | Yes |
| Python | Yes |
Modos de ejecución
| Modo de ejecución | Supported |
|---|---|
| Modo de actualización | Yes |
| Append mode | No |
| Modo completo | No |
Sources
| Sources | Supported |
|---|---|
| Apache Kafka | Yes |
| AWS MSK | Yes |
| Eventhub (mediante el conector de Kafka) | Yes |
| Kinesis | Sí (solo modo EFO) |
| Google Pub/Sub | No |
| Apache Pulsar | No |
Sinks
| Sinks | Supported |
|---|---|
| Apache Kafka | Yes |
| Eventhub (mediante el conector de Kafka) | Yes |
| Kinesis | No |
| Google Pub/Sub | No |
| Apache Pulsar | No |
| Receptores arbitrarios (usando forEachWriter) | Yes |
Operators
| Operators | Supported |
|---|---|
| Operaciones sin estado | |
|
Yes |
|
Yes |
| UDFs | |
|
Sí (con algunas limitaciones) |
|
Sí (con algunas limitaciones) |
| Aggregation | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| Funciones de agregaciones | Yes |
| Windowing | |
|
Yes |
|
Yes |
|
No |
| Deduplication | |
|
Sí (el estado es ilimitado) |
|
No |
| Secuencia: unión de tabla | |
|
Yes |
| Secuencia: unión de secuencia | No |
| (plano)MapGroupsWithState | No |
| transformWithState | Sí (con algunas diferencias) |
| union | Sí (con algunas limitaciones) |
| forEach | Yes |
| forEachBatch | No |
| mapPartitions | No (consulte limitación) |
Uso de transformWithState en modo en tiempo real
Para crear aplicaciones con estado personalizadas, Databricks admite transformWithState, una API en Apache Spark Structured Streaming. Consulte Compilación de una aplicación con estado personalizada para obtener más información sobre la API y los fragmentos de código.
Sin embargo, hay algunas diferencias entre el comportamiento de la API en modo tiempo real y las consultas de streaming tradicionales que aprovechan la arquitectura de micro-lotes.
- El método en modo de tiempo real
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)se llama para cada fila.- El
inputRowsiterador devuelve un valor único. En el modo micro-lote, se llama una vez para cada clave y el iteradorinputRowsdevuelve todos los valores de una clave en el micro-lote. - Debe ser consciente de esta diferencia al escribir su código.
- El
- Los temporizadores de eventos no son compatibles con el modo de tiempo real.
- En el modo en tiempo real, los temporizadores se retrasan al activarse en función de la llegada de datos. De lo contrario, si no hay datos, se activa al final del lote de ejecución prolongada. Por ejemplo, si se supone que un temporizador debe activarse a las 10:00:00 y no hay llegada de datos simultáneamente, no se activa. En su lugar, si los datos llegan a las 10:00:10, el temporizador se desencadena después de un retraso de 10 segundos. O bien, si no llegan datos y se está terminando el lote de larga duración, ejecuta el temporizador antes de terminar el lote de larga duración.
UDF de Python
Databricks admite la mayoría de las funciones definidas por el usuario (UDF) de Python en modo en tiempo real:
| Tipo de UDF | Supported |
|---|---|
| UDF sin estado | |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
|
Yes |
| UDF de agrupación con estado (UDAF) | |
|
Yes |
|
No |
| UDF de agrupación no con estado (UDAF) | |
|
No |
|
No |
|
No |
| Función Table | |
|
No |
| UC UDF | No |
Hay varios puntos que se deben tener en cuenta al usar UDF de Python en modo en tiempo real:
- Para minimizar la latencia, configure el tamaño del lote de flecha (spark.sql.execution.arrow.maxRecordsPerBatch) en 1.
- Compensación: esta configuración optimiza la latencia a costa del rendimiento. Para la mayoría de las cargas de trabajo, se recomienda esta configuración.
- Aumente el tamaño del lote solo si se requiere un mayor rendimiento para dar cabida al volumen de entrada, aceptando el posible aumento de la latencia.
- Las UDF y las funciones de Pandas no funcionan bien con un tamaño de lote de flecha de 1.
- Si usa UDF o funciones de Pandas, establezca el tamaño del lote de flecha en un valor mayor (por ejemplo, 100 o superior).
- Tenga en cuenta que esto implica una mayor latencia. Databricks recomienda usar UDF de flecha o función si es posible.
- Debido al problema de rendimiento con pandas, transformWithState solo se admite con la
Rowinterfaz .
Técnicas de optimización
| Technique | Habilitado de forma predeterminada |
|---|---|
| Seguimiento de progreso asincrónico: mueve la escritura al registro de desplazamiento y el registro de confirmación en un subproceso asincrónico, lo que reduce el tiempo entre lotes entre dos microprocesos. Esto puede ayudar a reducir la latencia de las consultas de streaming sin estado. | No |
| Punto de control de estado asincrónico: ayuda a reducir la latencia de las consultas de streaming con estado empezando a procesar el siguiente microproceso tan pronto como se complete el cálculo del microproceso anterior, sin esperar a que se realicen puntos de comprobación de estado. | No |
Limitations
Limitación de origen
Para Kinesis, no se admite el modo de sondeo. Además, las reparticiones frecuentes podrían afectar negativamente a la latencia.
Limitación de unión
Para Union, existen algunas limitaciones:
- No se admite la unión automática:
- Kafka: No puede usar el mismo objeto de marco de datos de origen y marcos de datos derivados de la unión. Solución alternativa: use diferentes tramas de datos que leen desde el mismo origen.
- Kinesis: No se pueden combinar marcos de datos derivados del mismo origen de Kinesis con la misma configuración. Solución alternativa: además de usar diferentes DataFrames, puede asignar un parámetro "consumerName" diferente a cada DataFrame.
- Los operadores con estado (por ejemplo,
aggregate,deduplicate,transformWithState) definidos antes de la Unión no son compatibles. - No se admite la unión con fuentes por lotes.
Limitación de MapPartitions
mapPartitions en Scala y las API de Python similares (mapInPandas, mapInArrow) toman un iterador de toda la partición de entrada y generan un iterador de toda la salida con una asignación arbitraria entre la entrada y la salida. Estas APIs pueden causar problemas de rendimiento en modo en tiempo real de transmisión, al bloquear toda la salida, lo que aumenta la latencia. La semántica de estas APIs no admite bien la propagación de marcas de agua.
Usa UDF escalar combinadas con Transformar tipos de datos complejos o filter para lograr una funcionalidad similar.
Examples
En los ejemplos siguientes se muestran las consultas admitidas.
Consultas sin estado
Se admiten consultas sin estado de una o varias fases.
De origen de Kafka a destino de Kafka
En este ejemplo, lees de una fuente de Kafka y escribes en un sumidero de Kafka.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Reparticionar
En este ejemplo, se lee desde una fuente Kafka, se vuelven a particionar los datos en 20 particiones y se escriben en un receptor Kafka.
Establezca la configuración spark.sql.execution.sortBeforeRepartition de Spark en false antes de usar la repartición.
Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.repartition(20)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Unión de instantáneas de secuencias (solo difusión)
En este ejemplo, se lee desde Kafka, se unen los datos con una tabla estática y se escribe en un receptor de Kafka. Tenga en cuenta que solo se admiten uniones stream-static que difunden la tabla estática, lo que significa que la tabla estática debe caber en la memoria.
Python
from pyspark.sql.functions import broadcast, expr
# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "earliest")
.load()
.withColumn("joinKey", expr("CAST(value AS STRING)"))
.join(
broadcast(spark.read.format("parquet").load(static_table_location)),
expr("joinKey = lookupKey")
)
.selectExpr("value AS key", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Fuente Kinesis a receptor Kafka
En este ejemplo, se lee desde una fuente Kinesis y se escribe en un receptor Kafka.
Python
query = (
spark.readStream
.format("kinesis")
.option("region", region_name)
.option("awsAccessKey", aws_access_key_id)
.option("awsSecretKey", aws_secret_access_key)
.option("consumerMode", "efo")
.option("consumerName", consumer_name)
.load()
.selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kinesis")
.option("region", regionName)
.option("awsAccessKey", awsAccessKeyId)
.option("awsSecretKey", awsSecretAccessKey)
.option("consumerMode", "efo")
.option("consumerName", consumerName)
.load()
.select(
col("partitionKey").alias("key"),
col("data").cast("string").alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Union
En este ejemplo, se unen dos DataFrames de Kafka de dos temas diferentes y se escriben en un receptor Kafka.
Python
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Consultas con estado
Deduplication
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.dropDuplicates(["timestamp", "value"])
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.dropDuplicates("timestamp", "value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
Aggregation
Python
from pyspark.sql.functions import col
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Unión con agregación
En este ejemplo, primero se uniona dos DataFrames de Kafka de dos temas diferentes y, a continuación, se realiza una agregación. Al final, escribe en el receptor Kafka.
Python
from pyspark.sql.functions import col
df1 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_1)
.load()
)
df2 = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("startingOffsets", "earliest")
.option("subscribe", input_topic_2)
.load()
)
query = (
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("update")
.start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic1)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic2)
.load()
df1.union(df2)
.groupBy(col("timestamp"), col("value"))
.count()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply())
.outputMode(OutputMode.Update())
.start()
TransformWithState
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}
/**
* This processor counts the number of records it has seen for each key using state variables
* with TTLs. It redundantly maintains this count with a value, list, and map state to put load
* on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
* the count for a given grouping key.)
*
* The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
* The source-timestamp is passed through so that we can calculate end-to-end latency. The output
* schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
*
*/
class RTMStatefulProcessor(ttlConfig: TTLConfig)
extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
@transient private var _value: ValueState[Long] = _
@transient private var _map: MapState[Long, String] = _
@transient private var _list: ListState[String] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
// Counts the number of records this key has seen
_value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
_map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
_list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, Long)],
timerValues: TimerValues): Iterator[(String, Long, Long)] = {
inputRows.map { row =>
val key = row._1
val sourceTimestamp = row._2
val oldValue = _value.get()
_value.update(oldValue + 1)
_map.updateValue(oldValue, key)
_list.appendValue(key)
(key, oldValue + 1, sourceTimestamp)
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic)
.load()
.select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
.as[(String, String, Timestamp)]
.groupByKey(row => row._1)
.transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
.as[(String, Long, Long)]
.select(
col("_1").as("key"),
col("_2").as("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.trigger(RealTimeTrigger.apply("5 minutes"))
.outputMode(OutputMode.Update())
.start()
Note
Hay una diferencia entre cómo el modo en tiempo real y otros modos de ejecución en Structured Streaming ejecutan el StatefulProcessor en transformWithState. Consulte Uso de transformWithState en modo en tiempo real
TransformWithState (PySpark, interfaz Row)
from typing import Iterator, Tuple
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType
class RTMStatefulProcessor(StatefulProcessor):
"""
This processor counts the number of records it has seen for each key using state variables
with TTLs. It redundantly maintains this count with a value, list, and map state to put load
on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
the count for a given grouping key.)
The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
The source-timestamp is passed through so that we can calculate end-to-end latency. The output
schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
"""
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("value", LongType(), True)])
self.value_state = handle.getValueState("value", state_schema, 30000)
map_key_schema = StructType([StructField("key", LongType(), True)])
map_value_schema = StructType([StructField("value", StringType(), True)])
self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
list_schema = StructType([StructField("value", StringType(), True)])
self.list_state = handle.getListState("list", list_schema, 30000)
def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
for row in rows:
# row is a tuple (key, source_timestamp)
key_str = row[0]
source_timestamp = row[1]
old_value = value.get()
if old_value is None:
old_value = 0
self.value_state.update((old_value + 1,))
self.map_state.update((old_value,), (key_str,))
self.list_state.appendValue((key_str,))
yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)
def close(self) -> None:
pass
output_schema = StructType(
[
StructField("key", StringType(), True),
StructField("value", LongType(), True),
StructField("timestamp", TimestampType(), True),
]
)
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
.groupBy("key")
.transformWithState(
statefulProcessor=RTMStatefulProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="processingTime",
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.trigger(realTime="5 minutes")
.outputMode("Update")
.start()
)
Note
Hay una diferencia entre la forma en que el modo en tiempo real y otros modos de ejecución de Structured Streaming ejecutan StatefulProcessor en transformWithState. Consulte Uso de transformWithState en modo en tiempo real
Sinks
Escritura en Postgres a través de foreachSink
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row}
/**
* Groups connection properties for
* the JDBC writers.
*
* @param url JDBC url of the form jdbc:subprotocol:subname to connect to
* @param dbtable database table that should be written into
* @param username username for authentication
* @param password password for authentication
*/
class JdbcWriterConfig(
val url: String,
val dbtable: String,
val username: String,
val password: String,
) extends Serializable
/**
* Handles streaming data writes to a database sink via JDBC, by:
* - connecting to the database
* - buffering incoming data rows in batches to reduce write overhead
*
* @param config connection parameters and configuration knobs for the writer
*/
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
extends ForeachWriter[Row] with Serializable {
// The writer currently only supports this hard-coded schema
private val UPSERT_STATEMENT_SQL =
s"""MERGE INTO "${config.dbtable}"
|USING (
| SELECT
| CAST(? AS INTEGER) AS "id",
| CAST(? AS CHARACTER VARYING) AS "data"
|) AS "source"
|ON "test"."id" = "source"."id"
|WHEN MATCHED THEN
| UPDATE SET "data" = "source"."data"
|WHEN NOT MATCHED THEN
| INSERT ("id", "data") VALUES ("source"."id", "source"."data")
|""".stripMargin
private val MAX_BUFFER_SIZE = 3
private val buffer = new Array[Row](MAX_BUFFER_SIZE)
private var bufferSize = 0
private var connection: Connection = _
/**
* Flushes the [[buffer]] by writing all rows in the buffer to the database.
*/
private def flushBuffer(): Unit = {
require(connection != null)
if (bufferSize == 0) {
return
}
var upsertStatement: PreparedStatement = null
try {
upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)
for (i <- 0 until bufferSize) {
val row = buffer(i)
upsertStatement.setInt(1, row.getAs[String]("key"))
upsertStatement.setString(2, row.getAs[String]("value"))
upsertStatement.addBatch()
}
upsertStatement.executeBatch()
connection.commit()
bufferSize = 0
} catch { case e: Exception =>
if (connection != null) {
connection.rollback()
}
throw e
} finally {
if (upsertStatement != null) {
upsertStatement.close()
}
}
}
override def open(partitionId: Long, epochId: Long): Boolean = {
connection = DriverManager.getConnection(config.url, config.username, config.password)
true
}
override def process(row: Row): Unit = {
buffer(bufferSize) = row
bufferSize += 1
if (bufferSize >= MAX_BUFFER_SIZE) {
flushBuffer()
}
}
override def close(errorOrNull: Throwable): Unit = {
flushBuffer()
if (connection != null) {
connection.close()
connection = null
}
}
}
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("subscribe", inputTopic)
.load()
.writeStream
.outputMode(OutputMode.Update())
.trigger(defaultTrigger)
.foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
.start()
Display
Important
Esta característica está disponible en Databricks Runtime 17.1 y versiones posteriores.
Origen de frecuencia de visualización
En este ejemplo, se lee desde una fuente de frecuencia y se muestra el DataFrame de streaming en un cuaderno.
Python
inputDF = (
spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode
val inputDF = spark
.readStream
.format("rate")
.option("numPartitions", 2)
.option("rowsPerSecond", 1)
.load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())