Usare la modalità in tempo reale nelle pipeline dichiarative di Lakeflow Spark

Importante

La modalità in tempo reale nelle pipeline dichiarative di Lakeflow Spark è disponibile in anteprima pubblica in Databricks Runtime 18.1.2 nel canale di anteprima.

La modalità in tempo reale abilita l'elaborazione dei dati a bassa latenza, con latenza end-to-end inferiore a cinque millisecondi. Usare la modalità in tempo reale per i carichi di lavoro operativi che richiedono una risposta immediata ai dati di streaming, ad esempio il rilevamento delle frodi e la personalizzazione in tempo reale.

La modalità in tempo reale è disponibile anche direttamente in Structured Streaming all'esterno delle pipeline. Consultare la Modalità in tempo reale nel Structured Streaming.

Come la modalità in tempo reale raggiunge una bassa latenza

La modalità in tempo reale differisce dall'elaborazione continua standard in tre modi chiave:

  • Batch a esecuzione prolungata: il sistema elabora i dati man mano che diventano disponibili nell'origine all'interno di batch con esecuzione prolungata (il valore predefinito è cinque minuti).
  • Pianificazione simultanea delle fasi: tutte le fasi della query vengono pianificate contemporaneamente. La risorsa di calcolo deve avere slot di attività sufficienti per coprire tutte le fasi contemporaneamente. Vedere Ridimensionamento delle risorse di calcolo.
  • Sequenza casuale di streaming: i dati vengono passati tra le fasi non appena vengono prodotte, anziché attendere il completamento di una fase upstream prima di avviare la fase downstream.

L'intervallo di checkpoint (configurato tramite pipelines.trigger.interval) controlla la frequenza con cui lo stato e gli offset della sorgente vengono salvati in modo persistente in un archivio durevole. Gli intervalli più lunghi riducono il sovraccarico del checkpoint, ma aumentano il tempo di ripristino dopo un errore e ritardano la segnalazione delle metriche. Gli intervalli più brevi migliorano la durabilità, ma aggiungono overhead.

Modalità in tempo reale e pipeline continue

La modalità in tempo reale è un tipo specifico di trigger continuo. La modalità continua è ancora necessaria. La modalità in tempo reale aggiunge ottimizzazioni della latenza a livello di flusso. Per usare la modalità in tempo reale, la pipeline deve essere eseguita per la prima volta in modalità continua. La modalità in tempo reale applica quindi ulteriori ottimizzazioni a livello di flusso per ottenere una latenza inferiore al secondo, oltre quanto consentito dalla normale elaborazione continua.

L'abilitazione della modalità in tempo reale richiede tre passaggi di configurazione:

  1. Impostare la pipeline sulla modalità continua.
  2. Abilitare la modalità in tempo reale a livello di pipeline.
  3. Definire un flusso di aggiornamento in tempo reale.

Requirements

Requirement Value
Databricks Runtime 18.1.2 nel canale di anteprima SDP
Tipo di calcolo Calcolo classico o serverless

Configurare la modalità in tempo reale

Passaggio 1: Impostare la pipeline sulla modalità continua

Nelle impostazioni della pipeline impostare Modalità pipeline su Continua o impostarla nel codice JSON della pipeline:

{
  "continuous": true
}

Passaggio 2: Abilitare la modalità in tempo reale a livello di pipeline

Nelle impostazioni della pipeline, aggiungere la chiave seguente alla configurazione di Spark in Configurazione avanzata > di Spark:

spark.databricks.streaming.realTimeMode.enabled = true

È anche possibile impostare questo valore nel codice JSON della pipeline:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

Passaggio 3: Definire un flusso di aggiornamento in tempo reale

La modalità in tempo reale richiede un flusso di aggiornamento. Usare dp.create_sink() per definire la destinazione di output, quindi usare l'elemento @dp.update_flow Decorator con pipelines.trigger impostato su "RealTime" e target che punta al sink.

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

Parametri di configurazione a livello di flusso:

Parametro Obbligatorio Predefinito Description
pipelines.trigger Yes Impostare su "RealTime" per abilitare la modalità in tempo reale per questo flusso.
pipelines.trigger.interval No "5 minutes" Intervallo di checkpoint. Controlla la frequenza con cui viene eseguito il commit dello stato e degli offset. I valori più brevi migliorano la recuperabilità; valori più lunghi riducono il sovraccarico.

Esempi di codice

Da Kafka a Kafka

Leggi da un topic Kafka e scrivi in una destinazione di output Kafka:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

Arricchisci con un join broadcast

Unire un flusso Kafka a una tabella di ricerca statica. Sono supportati solo i join di broadcast (tra flusso e dati statici). I join tra flussi non sono supportati in modalità tempo reale.

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Aggregazione

Contare gli eventi per chiave utilizzando un groupBy con stato. Impostare spark.sql.shuffle.partitions in modo che corrisponda al numero di partizioni di input per le operazioni con stato:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

Origini e destinazioni supportate

Connettore Come origine Come destinazione Note
Apache Kafka
AWS MSK Usa l'interfaccia compatibile con Kafka.
Hub eventi di Azure (connettore Kafka) Usa l'interfaccia compatibile con Kafka.
Amazon Kinesis Non supportato Utilizzare solo per la modalità EFO (Enhanced Fan-Out).
Delta Non supportato Non supportato

Dimensionamento del calcolo

È possibile eseguire una pipeline in tempo reale per ogni risorsa di calcolo se il calcolo dispone di slot di attività sufficienti. Gli slot disponibili per i task devono essere sufficienti a coprire tutti i task in tutte le fasi della query.

Tipo di pipeline Configuration Slot di attività necessari
Senza stato a fase singola (origine Kafka + sink) maxPartitions = 8 8
Con stato a due fasi (origine Kafka + shuffle) maxPartitions = 8, partizioni shuffle = 20 28 (8 + 20)
Tre fasi (origine Kafka + due shuffles) maxPartitions = 8, due fasi di mescolamento di 20 ciascuno 48 (8 + 20 + 20)

Se non si imposta maxPartitions, usare il numero di partizioni nel topic Kafka.

Supporto dell'operatore

Categoria Operatore Supportato
Senza stato Selezione, Proiezione
UDFs Scala UDF (Funzioni definite dall'utente in Scala) ✓ (con limitazioni)
UDFs UDF Python ✓ (con limitazioni)
Aggregazione somma, conteggio, massimo, minimo, media
Windowing Rotolamento, scorrimento
Windowing Sessione Non supportato
Deduplicazione dropDuplicates ✓ (stato non associato)
Deduplicazione dropDuplicatesWithinWatermark Non supportato
Joins join di tabella broadcast
Joins Join tra flussi Non supportato
Custom transformWithState ✓ (con differenze comportamentali)
Custom union ✓ (con limitazioni)
Custom forEach Non supportato
Custom flatMapGroupsWithState Non supportato
Custom mapPartitions Non supportato
Custom forEachBatch Non supportato

transformWithState in modalità in tempo reale

transformWithState è supportato in modalità in tempo reale con le differenze seguenti rispetto all'elaborazione micro batch:

  • handleInputRows viene richiamato una volta per riga anziché una volta per ogni chiave per batch. L'iteratore inputRows restituisce un singolo valore per ogni chiamata.
  • I timer basati sul tempo dell'evento non sono supportati. I timer in fase di elaborazione vengono attivati quando un batch a esecuzione prolungata termina se non sono arrivati dati.
  • transformWithStateInPandas non è supportato.

Funzioni definite dall'utente Pandas in modalità in tempo reale

Per ridurre al minimo la latenza con le UDF pandas, impostare spark.sql.execution.arrow.maxRecordsPerBatch su 1. Ciò consente di ottimizzare la latenza a scapito della velocità effettiva. Se anche il throughput è importante, impostare questo valore su 100 o un valore superiore.

Monitorare le prestazioni della modalità in tempo reale

La modalità in tempo reale espone le metriche di latenza in StreamingQueryProgress nel campo latencies. Accedi a queste metriche tramite StreamingQueryListener o esaminando la proprietà lastProgress nella query di streaming.

Metric Description
processingLatencyMs Tempo compreso tra il momento in cui un record viene letto dal flusso e quando viene elaborato completamente dal flusso
sourceQueuingLatencyMs Intervallo di tempo tra il momento in cui un record viene scritto con successo sul bus di messaggi (ad esempio, il tempo di aggiunta al log in Kafka) e il momento in cui viene letto per la prima volta dal flusso
e2eLatencyMs Latenza end-to-end totale da quando il record viene prodotto nell'origine a quando viene elaborato completamente dal flusso

Ogni metrica viene segnalata come percentili p50, p90, p95 e p99.

Limitations

È consigliabile un flusso in tempo reale per ogni pipeline. Sono consentiti più flussi, ma la contesa di slot di attività tra flussi aumenta la latenza.

Per un elenco completo delle limitazioni dell'operatore e dell'origine, vedere Limitazioni della modalità in tempo reale.

Risorse aggiuntive