Use el modo de tiempo real en las canalizaciones declarativas de Spark de Lakeflow

Important

El modo en tiempo real de Lakeflow Spark Declarative Pipelines se encuentra en versión preliminar pública para Databricks Runtime 18.1.2 en el canal de versión preliminar.

El modo en tiempo real permite el procesamiento de datos de latencia ultra baja, con latencia de un extremo a otro tan bajo como cinco milisegundos. Use el modo en tiempo real para cargas de trabajo operativas que requieren respuesta inmediata a los datos de streaming, como la detección de fraudes y la personalización en tiempo real.

El modo en tiempo real también está disponible directamente en Structured Streaming fuera de las canalizaciones. Consulte Modo en tiempo real en Structured Streaming.

Cómo el modo en tiempo real logra una latencia baja

El modo en tiempo real difiere del procesamiento continuo estándar de tres maneras clave:

  • Lotes de larga duración: el sistema procesa los datos a medida que está disponible en el origen en lotes de larga duración (el valor predeterminado es de cinco minutos).
  • Programación de fases simultáneas: todas las fases de consulta se programan al mismo tiempo. El recurso de computación debe tener suficientes ranuras de tareas disponibles para cubrir todas las fases simultáneamente. Consulte Ajuste de tamaño de proceso.
  • Redistribución en flujo: los datos se transfieren entre etapas en cuanto se generan, en lugar de esperar a que una etapa anterior termine antes de iniciar la etapa posterior.

El intervalo de punto de control (configurado mediante pipelines.trigger.interval) controla la frecuencia con la que el estado y los desplazamientos del origen se almacenan de forma persistente en un almacenamiento duradero. Los intervalos más largos reducen la sobrecarga de puntos de control, pero aumentan el tiempo de recuperación después de un error y retrasan los informes de métricas. Los intervalos más cortos mejoran la durabilidad, pero agregan sobrecarga.

Modo en tiempo real y canalizaciones continuas

El modo en tiempo real es un tipo especializado de desencadenador continuo. El modo continuo sigue siendo necesario: el modo en tiempo real agrega optimizaciones de latencia de nivel de flujo en la parte superior. Para usar el modo en tiempo real, la canalización debe ejecutarse primero en modo continuo. Después, el modo en tiempo real aplica optimizaciones adicionales en el nivel de flujo para lograr una latencia de sub-segundo más allá de lo que proporciona el procesamiento continuo estándar.

La habilitación del modo en tiempo real requiere tres pasos de configuración:

  1. Establezca la canalización en modo continuo.
  2. Habilite el modo en tiempo real en el nivel de canalización.
  3. Defina un flujo de actualización en tiempo real.

Requirements

Requirement Value
Databricks Runtime 18.1.2 en el canal de versión preliminar de SDP
Tipo de cálculo Proceso clásico o sin servidor

Configuración del modo en tiempo real

Paso 1: Establecer la canalización en modo continuo

En la configuración de la canalización, establezca Modo de canalización en Continuo o establézcalo en el JSON de la canalización:

{
  "continuous": true
}

Paso 2: Habilitar el modo en tiempo real en el nivel de canalización

En la configuración de la canalización, agregue la siguiente clave a la configuración de Spark en Configuración avanzada > de Spark:

spark.databricks.streaming.realTimeMode.enabled = true

También puede establecerlo en el JSON del pipeline:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Paso 3: Definir un flujo de actualización en tiempo real

El modo en tiempo real requiere un flujo de actualización. Utilice dp.create_sink() para definir el destino de salida y, a continuación, use el decorador @dp.update_flow con pipelines.trigger establecido en "RealTime" y target apuntando al receptor.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Parámetros de configuración de nivel de flujo:

Parámetro Obligatorio Predeterminado Description
pipelines.trigger Establézcalo en "RealTime" para habilitar el modo de tiempo real para este flujo.
pipelines.trigger.interval No "5 minutes" Intervalo de punto de comprobación. Controla la frecuencia con la que se confirman el estado y los desplazamientos. Los valores más cortos mejoran la capacidad de recuperación; los valores más largos reducen la sobrecarga.

Ejemplos de código

de Kafka a Kafka

Lea de un tema de Kafka y escriba en un destino de salida de Kafka:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Enriquecimiento con una combinación de difusión

Unir un flujo de Kafka con una tabla de búsqueda estática. Solo se admiten las uniones de difusión general (stream-to-static). Las uniones entre flujos no se admiten en modo de tiempo real.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Agregación

Recuento de eventos mediante una clave con estado groupBy. Establezca spark.sql.shuffle.partitions para que coincida con el número de particiones de entrada para las operaciones con estado:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Fuentes y sumideros admitidos

Conector Como origen Como sumidero Notas
Apache Kafka
AWS MSK Usa la interfaz compatible con Kafka.
Azure Event Hubs (conector de Kafka) Usa la interfaz compatible con Kafka.
Amazon Kinesis No soportado Utilizar solo para el modo EFO (Fan-Out mejorado).
Delta No soportado No soportado

Dimensionamiento de los recursos de proceso

Puede ejecutar una canalización en tiempo real por recurso de proceso si el proceso tiene suficientes ranuras de tareas. Las ranuras de tareas disponibles deben cubrir todas las tareas en todas las fases de consulta.

Tipo de canalización Configuration Ranuras de tareas necesarias
Sin estado monofásico (origen de Kafka + sumidero) maxPartitions = 8 8
Con estado de dos etapas (origen de Kafka + reorganización de datos) maxPartitions = 8, particiones aleatorias = 20 28 (8 + 20)
De tres etapas (fuente de Kafka + dos redistribuciones) maxPartitions = 8, dos fases aleatorias de 20 cada una 48 (8 + 20 + 20)

Si no establece maxPartitions, use el número de particiones en el tema de Kafka.

Soporte para operadores

Categoría Operador Supported
Sin estado Selección, proyección
UDFs Scala UDF ✓ (con limitaciones)
UDFs Funciones Definidas por el Usuario (UDF) de Python ✓ (con limitaciones)
Agregación suma, recuento, máximo, mínimo, promedio
Windowing Giratorio, deslizante
Windowing Session No soportado
Eliminación de duplicados dropDuplicates ✓ (estado no acotado)
Eliminación de duplicados dropDuplicatesWithinWatermark No soportado
Joins Combinación de tabla de difusión
Joins Combinación de secuencia a secuencia No soportado
Personalizado transformWithState ✓ (con diferencias de comportamiento)
Personalizado union ✓ (con limitaciones)
Personalizado forEach No soportado
Personalizado flatMapGroupsWithState No soportado
Personalizado mapPartitions No soportado
Personalizado forEachBatch No soportado

transformWithState en modo en tiempo real

transformWithState es compatible con el modo de tiempo real, con las siguientes diferencias con respecto al procesamiento por microlotes:

  • handleInputRows se invoca una vez por fila en lugar de una vez por clave por lote. El inputRows iterador produce un valor único por invocación.
  • No se admiten temporizadores de tiempo de evento. Los temporizadores de tiempo de procesamiento se activan cuando finaliza un lote de ejecución prolongada si no ha llegado ningún dato.
  • No se admite transformWithStateInPandas.

UDF de Pandas en modo en tiempo real

Para minimizar la latencia con UDF de pandas, establezca spark.sql.execution.arrow.maxRecordsPerBatch en 1. Esto optimiza la latencia a costa del rendimiento. Si el rendimiento también es importante, establezca este valor en 100 o superior.

Supervisión del rendimiento del modo en tiempo real

El modo en tiempo real muestra las métricas de latencia en StreamingQueryProgress, en el campo latencies. Acceda a estas métricas mediante un StreamingQueryListener o inspeccionando la propiedad lastProgress en la consulta de transmisión.

Métrica Description
processingLatencyMs Tiempo entre el momento en que el flujo lee un registro y cuando el flujo lo procesa por completo.
sourceQueuingLatencyMs Tiempo entre el momento en que un registro se escribe correctamente en el bus de mensajes (por ejemplo, tiempo de anexión de registros en Kafka) y cuando el flujo lo lee por primera vez.
e2eLatencyMs Latencia total de un extremo a otro desde el momento en que el registro se genera en el origen a cuando el flujo lo procesa por completo.

Cada métrica se informa en los percentiles p50, p90, p95 y p99.

Limitaciones

Se recomienda un flujo en tiempo real por canalización. Se permiten varios flujos, pero la contención de ranuras de tareas entre flujos aumenta la latencia.

Para obtener una lista completa de las limitaciones de operador y origen, consulte Limitaciones del modo en tiempo real.

Recursos adicionales