Condividi tramite


Modalità in tempo reale in Structured Streaming

Important

Questa funzionalità è in Anteprima Pubblica.

Questa pagina descrive la modalità in tempo reale, un tipo di trigger per Structured Streaming che consente l'elaborazione dei dati a latenza ultra bassa con latenza end-to-end fino a 5 ms. Questa modalità è progettata per carichi di lavoro operativi che richiedono una risposta immediata ai dati di streaming.

La modalità in tempo reale è disponibile in Databricks Runtime 16.4 LTS e versioni successive.

Carichi di lavoro operativi

I carichi di lavoro di streaming possono essere suddivisi in generale in carichi di lavoro analitici e carichi di lavoro operativi:

  • I carichi di lavoro analitici usano l'inserimento e la trasformazione dei dati, in genere seguendo l'architettura medallion (ad esempio, l'inserimento di dati nelle tabelle bronze, silver e gold).
  • I carichi di lavoro operativi usano dati in tempo reale, applicano la logica di business e attivano azioni o decisioni downstream.

Ecco alcuni esempi di carichi di lavoro operativi:

  • Blocco o contrassegno di una transazione con carta di credito in tempo reale se un punteggio di frode supera una soglia, in base a fattori come posizione insolita, dimensioni elevate delle transazioni o modelli di spesa rapidi.
  • Il recapito di un messaggio promozionale quando i dati clickstream mostrano che un utente ha esplorato i jeans per cinque minuti, offrendo uno sconto di 25% se acquista nei prossimi 15 minuti.

In generale, i carichi di lavoro operativi sono caratterizzati dalla necessità di una latenza end-to-end inferiore al secondo. Questa operazione può essere ottenuta con la modalità in tempo reale in Apache Spark Structured Streaming.

Come la modalità in tempo reale raggiunge una bassa latenza

La modalità in tempo reale migliora l'architettura di esecuzione tramite:

  • Esecuzione di batch a esecuzione prolungata (il valore predefinito è 5 minuti), in cui i dati vengono elaborati man mano che diventano disponibili nell'origine.
  • Tutte le fasi della query vengono pianificate contemporaneamente. Ciò richiede che il numero di slot di attività disponibili sia uguale o maggiore del numero di attività di tutte le fasi di un batch.
  • I dati vengono trasferiti tra le fasi non appena sono disponibili, utilizzando uno shuffle tramite streaming.

Al termine dell'elaborazione di un batch e prima dell'avvio del batch successivo, i checkpoint structured streaming procedono e rendono disponibili le metriche per l'ultimo batch. Se i batch sono più lunghi, queste attività potrebbero essere meno frequenti, con conseguente ripetizione più lunga in caso di errore e ritardo nella disponibilità delle metriche. D'altra parte, se i batch sono più piccoli, queste attività diventano più frequenti, potenzialmente che influiscono sulla latenza. Databricks consiglia di eseguire il benchmark della modalità in tempo reale rispetto al carico di lavoro di destinazione e ai requisiti per trovare l'intervallo di trigger appropriato.

Configurazione del cluster

Per usare la modalità in tempo reale in Structured Streaming, è necessario configurare un processo Lakeflow classico:

  1. Nell'area di lavoro di Azure Databricks fare clic su Nuovo nell'angolo superiore sinistro. Scegliere Altro e fare clic su Cluster.

  2. Cancella accelerazione Foton.

  3. Deseleziona Abilita scalabilità automatica.

  4. In Prestazioni avanzate deselezionare Usa istanze spot.

  5. In Modalità avanzata e accesso fare clic su Manuale e selezionare Dedicato (in precedenza: Utente singolo).

  6. In Spark immettere quanto segue in Configurazione spark:

    spark.databricks.streaming.realTimeMode.enabled true
    
  7. Clicca su Crea.

Requisiti relativi alle dimensioni del cluster

È possibile eseguire un processo in tempo reale per ogni cluster se il cluster dispone di slot di attività sufficienti.

Per l'esecuzione in modalità a bassa latenza, il numero totale di slot di attività disponibili deve essere maggiore o uguale al numero di attività in tutte le fasi della query.

Esempi di calcolo degli slot

Pipeline senza stato a fase singola (Kafka source + sink):

Se maxPartitions = 8, sono necessari almeno 8 slot. Se maxPartitions non è impostato, usa il numero di partizioni del topic Kafka.

Pipeline con stato a due fasi (origine Kafka + shuffle):

Se maxPartitions = 8 e partizioni di mescolamento = 20, avete bisogno di 8 + 20 = 28 slot.

Pipeline a tre fasi (origine Kafka + shuffle + repartition):

Con maxPartitions = 8 e due stadi di shuffle di 20 ciascuno, sono necessari 8 + 20 + 20 = 48 slot.

Considerazioni chiave

Quando si configura il cluster, tenere presente quanto segue:

  • A differenza della modalità micro batch, le attività in tempo reale possono rimanere inattive durante l'attesa dei dati, quindi il ridimensionamento corretto è essenziale per evitare sprechi di risorse.
  • Puntare a un livello di utilizzo di destinazione (ad esempio, 50%) ottimizzando:
    • maxPartitions (per Kafka)
    • spark.sql.shuffle.partitions (per fasi di rimescolamento)
  • Databricks consiglia di impostare maxPartitions in modo che ogni attività gestisca più partizioni Kafka per ridurre il sovraccarico.
  • Modificare le fasi delle attività per ogni lavoratore in modo che corrispondano al carico di lavoro per processi a una fase.
  • Per i lavori con carichi intensivi di shuffle, provare a trovare il numero minimo di partizioni di shuffle che evitano i backlog e regolare di conseguenza. L'operazione non verrà pianificata se il cluster non dispone di slot sufficienti.

Note

Da Databricks Runtime 16.4 LTS e versioni successive, tutte le pipeline in tempo reale utilizzano checkpoint v2, che consente di passare facilmente tra le modalità in tempo reale e micro-batch.

Configurazione delle query

È necessario abilitare il trigger in tempo reale per specificare che una query deve essere eseguita usando la modalità a bassa latenza. Inoltre, i trigger in tempo reale sono supportati solo in modalità di aggiornamento. Per esempio:

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # in PySpark, realTime trigger requires you to specify the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Observability

In precedenza, la latenza delle query end-to-end era strettamente legata alla durata del batch, rendendo la durata del batch un buon indicatore della latenza delle query. Tuttavia, questo metodo non si applica più in modalità in tempo reale, richiedendo approcci alternativi per misurare la latenza. La latenza end-to-end è specifica del carico di lavoro e talvolta può essere misurata con precisione solo con la logica di business. Ad esempio, se il timestamp di origine viene restituito in Kafka, la latenza può essere calcolata come differenza tra il timestamp di output di Kafka e il timestamp di origine.

È possibile stimare la latenza end-to-end in diversi modi in base alle informazioni parziali raccolte durante il processo di streaming.

Usare StreamingQueryProgress

Nell'evento StreamingQueryProgress sono incluse le metriche seguenti, che vengono registrate automaticamente nei log del driver. È anche possibile accedervi tramite la StreamingQueryListenerfunzione di callback.onQueryProgress() QueryProgressEvent.json() o toString() includono metriche aggiuntive in modalità in tempo reale.

  1. Latenza di elaborazione (processingLatencyMs). Tempo trascorso tra quando la query in modalità in tempo reale legge un record e prima che venga scritto nella fase successiva o downstream. Per le query a fase singola, questa misura la stessa durata della latenza E2E. Questa metrica viene segnalata per ogni attività.
  2. Latenza di accodamento di origine (sourceQueuingLatencyMs). Intervallo di tempo tra il momento in cui un record viene scritto correttamente in un bus dei messaggi, ad esempio l'ora di apposizione del log in Kafka, e quando il record è stato letto per la prima volta dalla query in modalità in tempo reale. Questa metrica viene segnalata per ogni attività.
  3. Latenza E2E (e2eLatencyMs). Il tempo che intercorre tra il momento in cui il record viene scritto correttamente su un bus di messaggi e quello in cui il record viene elaborato a valle dalla query in modalità in tempo reale. Questa metrica viene aggregata per batch in tutti i record elaborati da tutte le attività.

Per esempio:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

Usare l'API Observe nelle attività

L'API Osserva consente di misurare la latenza senza avviare un altro processo. Se si dispone di un timestamp di origine che approssima l'ora di arrivo dei dati di origine e viene passata prima di raggiungere il sink o se è possibile trovare un modo per passare il timestamp, è possibile stimare la latenza di ogni batch usando l'API Osserva:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

In questo esempio viene registrato un timestamp corrente prima di restituire la voce e la latenza viene stimata calcolando la differenza tra questo timestamp e il timestamp di origine del record. I risultati sono inclusi nei report in corso e resi disponibili ai listener. Ecco un output di esempio:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

Che cos'è supportato?

Environments

Tipo di cluster Supported
Dedicato (in precedenza: utente singolo) Yes
Standard (in precedenza: condiviso) No
Pipeline dichiarative di Lakeflow Spark classiche No
Pipeline dichiarative di Lakeflow Spark serverless No
Serverless No

Languages

Language Supported
Scala Yes
Java Yes
Python Yes

Modalità di esecuzione

Modalità di esecuzione Supported
Modalità di aggiornamento Yes
Append mode No
Modalità completa No

Sources

Sources Supported
Apache Kafka Yes
AWS MSK Yes
Eventhub (uso del connettore Kafka) Yes
Kinesis Sì (solo modalità EFO)
Google Pub/Sub No
Apache Pulsar No

Sinks

Sinks Supported
Apache Kafka Yes
Eventhub (uso del connettore Kafka) Yes
Kinesis No
Google Pub/Sub No
Apache Pulsar No
Destinazioni arbitrarie (con forEachWriter) Yes

Operators

Operators Supported
Operazioni senza stato
  • Selection
Yes
  • Projection
Yes
UDFs
  • Funzioni definite dall'utente scala
Sì (con alcune limitazioni)
  • Python UDF
Sì (con alcune limitazioni)
Aggregation
  • sum
Yes
  • count
Yes
  • max
Yes
  • min
Yes
  • avg
Yes
Funzioni di aggregazione Yes
Windowing
  • Tumbling
Yes
  • Sliding
Yes
  • Session
No
Deduplication
  • dropDuplicates
Sì (lo stato è illimitato)
  • dropDuplicatesWithinWatermark
No
Stream - Join di tabella
  • Tabella broadcast (deve essere piccola)
Yes
Stream - Unione di Stream No
(piatto)MapGroupsWithState No
transformWithState Sì (con alcune differenze)
union Sì (con alcune limitazioni)
forEach Yes
forEachBatch No
mapPartitions No (vedere la limitazione)

Usare transformWithState in modalità in tempo reale

Per la compilazione di applicazioni con stato personalizzate, Databricks supporta transformWithState, un'API in Apache Spark Structured Streaming. Per altre informazioni sull'API e sui frammenti di codice, vedere Creare un'applicazione con stato personalizzata .

Esistono tuttavia alcune differenze tra il comportamento dell'API in modalità in tempo reale e le query di streaming tradizionali che sfruttano l'architettura micro-batch.

  • Il metodo in modalità handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) in tempo reale viene chiamato per ogni riga.
    • L'iteratore inputRows restituisce un singolo valore. In modalità micro batch, viene chiamato una volta per ogni chiave e l'iteratore inputRows restituisce tutti i valori per una chiave nel micro batch.
    • È necessario essere consapevoli di questa differenza durante la scrittura del codice.
  • I timer dell'ora dell'evento non sono supportati in modalità in tempo reale.
  • In modalità in tempo reale, i timer vengono ritardati nel loro avvio in base all'arrivo dei dati. In caso contrario, se non sono presenti dati, questi vengono inviati alla fine del batch a lunga durata. Ad esempio, se un timer dovrebbe essere attivato alle 10:00:00 e non vi è alcun arrivo di dati contemporaneamente, non viene attivato. Se invece i dati arrivano alle 10:00:10, il timer viene attivato con un ritardo di 10 secondi. In alternativa, se non arrivano dati e il batch a esecuzione prolungata viene terminato, viene eseguito il timer prima di terminare il batch a esecuzione prolungata.

UDF Python

Databricks supporta la maggior parte delle funzioni definite dall'utente Python in modalità in tempo reale:

Tipo di funzione definita dall'utente Supported
Funzione definita dall'utente senza stato
Yes
  • Funzione definita dall'utente scalare freccia
Yes
Yes
  • Funzione Arrow (mapInArrow)
Yes
Yes
UDF con raggruppamento con stato (UDAF)
  • transformWithState (NOTA: solo Row interfaccia)
Yes
  • applyInPandasWithState
No
Funzione definita dall'utente con raggruppamento non con stato (UDAF)
  • apply
No
  • applyInArrow
No
  • applyInPandas
No
Funzione Table
No
UC UDF No

Esistono diversi aspetti da considerare quando si usano funzioni definite dall'utente Python in modalità in tempo reale:

  • Per ridurre al minimo la latenza, configurare le dimensioni del batch freccia (spark.sql.execution.arrow.maxRecordsPerBatch) su 1.
    • Compromesso: questa configurazione ottimizza la latenza a scapito della velocità effettiva. Per la maggior parte dei carichi di lavoro, questa impostazione è consigliata.
    • Aumentare le dimensioni del batch solo se è necessaria una velocità effettiva maggiore per supportare il volume di input, accettando il potenziale aumento della latenza.
  • Le funzioni e le funzioni pandas non funzionano bene con le dimensioni del batch Arrow pari a 1.
    • Se si usano funzioni o funzioni definite dall'utente pandas, impostare le dimensioni del batch freccia su un valore superiore, ad esempio 100 o superiore.
    • Si noti che ciò implica una latenza più elevata. Databricks consiglia di usare la funzione o la funzione UDF freccia, se possibile.
  • A causa del problema di prestazioni con pandas, transformWithState è supportato solo con l'interfaccia Row .

Tecniche di ottimizzazione

Technique Abilitata per impostazione predefinita
Tracciamento asincrono dell'avanzamento: sposta la scrittura nel log di offset e il log di commit in un thread asincrono, riducendo il tempo inter-batch tra due micro-batch. Ciò consente di ridurre la latenza delle query di streaming senza stato. No
Checkpoint dello stato asincrono: consente di ridurre la latenza delle query di streaming con stato iniziando a elaborare il micro batch successivo non appena viene completato il calcolo del micro batch precedente, senza attendere il checkpoint dello stato. No

Limitations

Limitazione dell'origine

Per Kinesis, la modalità di polling non è supportata. Inoltre, le ripartizioni frequenti potrebbero influire negativamente sulla latenza.

Limitazione dell'unione

Per l'Unione esistono alcune limitazioni:

  • L'unione automatica non è supportata:
    • Kafka: Non è possibile utilizzare lo stesso oggetto frame di dati di origine e allo stesso tempo unire frame di dati derivati da esso. Soluzione alternativa: usare diversi dataframe letti dalla stessa origine.
    • Kinesis: non è possibile effettuare un'unione di frame di dati derivati dalla stessa origine Kinesis con la stessa configurazione. Soluzione alternativa: oltre a usare diversi dataframe, è possibile assegnare un'opzione "consumerName" diversa a ogni dataframe.
  • Gli operatori con stato, ad esempio aggregate, deduplicate e transformWithState, definiti prima della Union non sono supportati.
  • L'unione con le fonti batch non è supportata.

Limitazione di MapPartitions

mapPartitions in Scala e api Python simili (mapInPandas, mapInArrow) accettano un iteratore dell'intera partizione di input e producono un iteratore dell'intero output con mapping arbitrario tra input e output. Queste API possono causare problemi di prestazioni nella modalità streaming Real-Time bloccando l'intero output, aumentando la latenza. La semantica di queste API non supporta adeguatamente la propagazione della filigrana.

Utilizzare funzioni definite dall'utente scalari combinate con Trasforma tipi di dati complessi o filter per ottenere funzionalità simili.

Examples

Gli esempi seguenti mostrano le query supportate.

Query senza stato

Sono supportate query senza stato sia a singolo stadio che a più stadi.

Da sorgente Kafka a destinazione Kafka

In questo esempio si legge da un'origine Kafka e si scrive in un sink Kafka.

Python
query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("startingOffsets", "earliest")
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Ripartizionare

In questo esempio si legge da un'origine Kafka, si ripartiziona i dati in 20 partizioni e si scrive in un sink Kafka.

Impostare la configurazione spark.sql.execution.sortBeforeRepartition di Spark su false prima di usare la ripartizione.

Python
# Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .repartition(20)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

// Sorting is not supported in repartition with real-time mode, so this has to be set to false to achieve low latency.
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "false")

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .repartition(20)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Join dello snapshot dello stream (solo broadcast)

In questo esempio si legge da Kafka, si uniscono i dati con una tabella statica e si scrive in un sink Kafka. Si noti che sono supportati solo i join flusso-statici che trasmettono la tabella statica, il che significa che la tabella statica deve poter essere contenuta in memoria.

Python
from pyspark.sql.functions import broadcast, expr

# We assume the static table in the path `stateic_table_location` has a column 'lookupKey'.

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("subscribe", input_topic)
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("joinKey", expr("CAST(value AS STRING)"))
    .join(
        broadcast(spark.read.format("parquet").load(static_table_location)),
        expr("joinKey = lookupKey")
    )
    .selectExpr("value AS key", "value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .join(broadcast(spark.read.format("parquet").load(staticTableLocation)), expr("joinKey = lookupKey"))
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Sorgente Kinesis a sink Kafka

In questo esempio, si legge da un'origine Kinesis e si scrive in un sink Kafka.

Python
query = (
    spark.readStream
        .format("kinesis")
        .option("region", region_name)
        .option("awsAccessKey", aws_access_key_id)
        .option("awsSecretKey", aws_secret_access_key)
        .option("consumerMode", "efo")
        .option("consumerName", consumer_name)
        .load()
        .selectExpr("parttitionKey AS key", "CAST(data AS STRING) AS value")
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .trigger(realTime="5 minutes")
        .outputMode("update")
        .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kinesis")
      .option("region", regionName)
      .option("awsAccessKey", awsAccessKeyId)
      .option("awsSecretKey", awsSecretAccessKey)
      .option("consumerMode", "efo")
      .option("consumerName", consumerName)
      .load()
      .select(
        col("partitionKey").alias("key"),
        col("data").cast("string").alias("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Union

In questo esempio vengono uniti due dataframe Kafka da due argomenti diversi e si scrive in un sink Kafka.

Python
df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Query con stato

Deduplication

Python
query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .dropDuplicates(["timestamp", "value"])
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .dropDuplicates("timestamp", "value")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

Aggregation

Python
from pyspark.sql.functions import col

query = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic)
    .load()
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Unione con aggregazione

In questo esempio si unioneno prima due dataframe Kafka da due argomenti diversi e quindi si esegue un'aggregazione. Alla fine, si scrive nel sink Kafka.

Python
from pyspark.sql.functions import col

df1 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_1)
    .load()
)

df2 = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("startingOffsets", "earliest")
    .option("subscribe", input_topic_2)
    .load()
)

query = (
    df1.union(df2)
    .groupBy(col("timestamp"), col("value"))
    .count()
    .selectExpr("CAST(value AS STRING) AS key", "CAST(count AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker_address)
    .option("topic", output_topic)
    .option("checkpointLocation", checkpoint_location)
    .trigger(realTime="5 minutes")
    .outputMode("update")
    .start()
)
Scala
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val df1 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic1)
      .load()

val df2 = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic2)
      .load()

df1.union(df2)
      .groupBy(col("timestamp"), col("value"))
      .count()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply())
      .outputMode(OutputMode.Update())
      .start()

TransformWithState

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
import org.apache.spark.sql.streaming.{ListState, MapState, StatefulProcessor, OutputMode, TTLConfig, TimeMode, TimerValues, ValueState}

/**
 * This processor counts the number of records it has seen for each key using state variables
 * with TTLs. It redundantly maintains this count with a value, list, and map state to put load
 * on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
 * the count for a given grouping key.)
 *
 * The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
 * The source-timestamp is passed through so that we can calculate end-to-end latency. The output
 * schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
 *
 */

class RTMStatefulProcessor(ttlConfig: TTLConfig)
  extends StatefulProcessor[String, (String, Long), (String, Long, Long)] {
  @transient private var _value: ValueState[Long] = _
  @transient private var _map: MapState[Long, String] = _
  @transient private var _list: ListState[String] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    // Counts the number of records this key has seen
    _value = getHandle.getValueState("value", Encoders.scalaLong, ttlConfig)
    _map = getHandle.getMapState("map", Encoders.scalaLong, Encoders.STRING, ttlConfig)
    _list = getHandle.getListState("list", Encoders.STRING, ttlConfig)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Long)],
      timerValues: TimerValues): Iterator[(String, Long, Long)] = {
    inputRows.map { row =>
      val key = row._1
      val sourceTimestamp = row._2

      val oldValue = _value.get()
      _value.update(oldValue + 1)
      _map.updateValue(oldValue, key)
      _list.appendValue(key)

      (key, oldValue + 1, sourceTimestamp)
    }
  }
}

spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .select(col("key").cast("STRING"), col("value").cast("STRING"), col("timestamp"))
      .as[(String, String, Timestamp)]
      .groupByKey(row => row._1)
      .transformWithState(new RTMStatefulProcessor(TTLConfig(Duration.ofSeconds(30))), TimeMode.ProcessingTime, OutputMode.Update)
      .as[(String, Long, Long)]
      .select(
            col("_1").as("key"),
            col("_2").as("value")
      )
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .trigger(RealTimeTrigger.apply("5 minutes"))
      .outputMode(OutputMode.Update())
      .start()

Note

Esiste una differenza tra come la modalità in tempo reale e altre modalità di esecuzione in Structured Streaming eseguono il StatefulProcessor in transformWithState. Consultare l'uso di transformWithState in modalità in tempo reale

TransformWithState (PySpark, interfaccia Row)

from typing import Iterator, Tuple

from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import LongType, StringType, TimestampType, StructField, StructType


class RTMStatefulProcessor(StatefulProcessor):
  """
  This processor counts the number of records it has seen for each key using state variables
  with TTLs. It redundantly maintains this count with a value, list, and map state to put load
  on the state variable cleanup mechanism. (In practice, only one value state is needed to maintain
  the count for a given grouping key.)

  The input schema it expects is (String, Long) which represents a (key, source-timestamp) tuple.
  The source-timestamp is passed through so that we can calculate end-to-end latency. The output
  schema is (String, Long, Long), which represents a (key, count, source-timestamp) 3-tuple.
  """

  def init(self, handle: StatefulProcessorHandle) -> None:
    state_schema = StructType([StructField("value", LongType(), True)])
    self.value_state = handle.getValueState("value", state_schema, 30000)
    map_key_schema = StructType([StructField("key", LongType(), True)])
    map_value_schema = StructType([StructField("value", StringType(), True)])
    self.map_state = handle.getMapState("map", map_key_schema, map_value_schema, 30000)
    list_schema = StructType([StructField("value", StringType(), True)])
    self.list_state = handle.getListState("list", list_schema, 30000)

  def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
    for row in rows:
      # row is a tuple (key, source_timestamp)
      key_str = row[0]
      source_timestamp = row[1]
      old_value = value.get()
      if old_value is None:
        old_value = 0
      self.value_state.update((old_value + 1,))
      self.map_state.update((old_value,), (key_str,))
      self.list_state.appendValue((key_str,))
      yield Row(key=key_str, value=old_value + 1, timestamp=source_timestamp)

  def close(self) -> None:
    pass


output_schema = StructType(
  [
    StructField("key", StringType(), True),
    StructField("value", LongType(), True),
    StructField("timestamp", TimestampType(), True),
  ]
)

query = (
  spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("subscribe", input_topic)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .groupBy("key")
  .transformWithState(
    statefulProcessor=RTMStatefulProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="processingTime",
  )
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", broker_address)
  .option("topic", output_topic)
  .option("checkpointLocation", checkpoint_location)
  .trigger(realTime="5 minutes")
  .outputMode("Update")
  .start()
)

Note

Esiste una differenza tra l'esecuzione StatefulProcessor in modalità in tempo reale e altre modalità di esecuzione in Structured Streaming in transformWithState. Consultare l'uso di transformWithState in modalità in tempo reale

Sinks

Scrittura su Postgres tramite foreachSink

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.sql.{ForeachWriter, Row}

/**
 * Groups connection properties for
 * the JDBC writers.
 *
 * @param url JDBC url of the form jdbc:subprotocol:subname to connect to
 * @param dbtable database table that should be written into
 * @param username username for authentication
 * @param password password for authentication
 */
class JdbcWriterConfig(
    val url: String,
    val dbtable: String,
    val username: String,
    val password: String,
) extends Serializable

/**
 * Handles streaming data writes to a database sink via JDBC, by:
 *   - connecting to the database
 *   - buffering incoming data rows in batches to reduce write overhead
 *
 * @param config connection parameters and configuration knobs for the writer
 */
class JdbcStreamingDataWriter(config: JdbcWriterConfig)
  extends ForeachWriter[Row] with Serializable {
  // The writer currently only supports this hard-coded schema
  private val UPSERT_STATEMENT_SQL =
    s"""MERGE INTO "${config.dbtable}"
       |USING (
       |  SELECT
       |    CAST(? AS INTEGER) AS "id",
       |    CAST(? AS CHARACTER VARYING) AS "data"
       |) AS "source"
       |ON "test"."id" = "source"."id"
       |WHEN MATCHED THEN
       |  UPDATE SET "data" = "source"."data"
       |WHEN NOT MATCHED THEN
       |  INSERT ("id", "data") VALUES ("source"."id", "source"."data")
       |""".stripMargin

  private val MAX_BUFFER_SIZE = 3
  private val buffer = new Array[Row](MAX_BUFFER_SIZE)
  private var bufferSize = 0

  private var connection: Connection = _

  /**
   * Flushes the [[buffer]] by writing all rows in the buffer to the database.
   */
  private def flushBuffer(): Unit = {
    require(connection != null)

    if (bufferSize == 0) {
      return
    }

    var upsertStatement: PreparedStatement = null

    try {
      upsertStatement = connection.prepareStatement(UPSERT_STATEMENT_SQL)

      for (i <- 0 until bufferSize) {
        val row = buffer(i)
        upsertStatement.setInt(1, row.getAs[String]("key"))
        upsertStatement.setString(2, row.getAs[String]("value"))
        upsertStatement.addBatch()
      }

      upsertStatement.executeBatch()
      connection.commit()

      bufferSize = 0
    } catch { case e: Exception =>
      if (connection != null) {
        connection.rollback()
      }
      throw e
    } finally {
      if (upsertStatement != null) {
        upsertStatement.close()
      }
    }
  }

  override def open(partitionId: Long, epochId: Long): Boolean = {
    connection = DriverManager.getConnection(config.url, config.username, config.password)
    true
  }

  override def process(row: Row): Unit = {
    buffer(bufferSize) = row
    bufferSize += 1
    if (bufferSize >= MAX_BUFFER_SIZE) {
      flushBuffer()
    }
  }

  override def close(errorOrNull: Throwable): Unit = {
    flushBuffer()
    if (connection != null) {
      connection.close()
      connection = null
    }
  }
}


spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
      .option("subscribe", inputTopic)
      .load()
      .writeStream
      .outputMode(OutputMode.Update())
      .trigger(defaultTrigger)
      .foreach(new JdbcStreamingDataWriter(new JdbcWriterConfig(jdbcUrl, tableName, jdbcUsername, jdbcPassword)))
      .start()

Display

Important

Questa funzionalità è disponibile in Databricks Runtime 17.1 e versioni successive.

Tasso di origine della frequenza di visualizzazione

In questo esempio si legge da un'origine di frequenza e si visualizza il dataframe di streaming in un notebook.

Python
inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")
Scala
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())