Condividi tramite


Applicare filigrane per controllare le soglie di elaborazione dati

Questa pagina descrive i concetti di base del watermarking e fornisce consigli per l'uso dei watermark nelle operazioni comuni di streaming stateful. È necessario applicare filigrane alle operazioni di streaming con stato per evitare di espandere infinitamente la quantità di dati mantenuti nello stato, che può introdurre problemi di memoria o aumentare le latenze di elaborazione durante le operazioni di streaming a esecuzione prolungata.

Che cos'è una filigrana?

Structured Streaming utilizza le watermarks per controllare il limite di tempo in cui continuare a elaborare gli aggiornamenti per una determinata entità di stato. Esempi comuni di entità di stato includono:

  • Aggregazioni in un intervallo di tempo.
  • Chiavi univoce in un collegamento tra due flussi.

Quando dichiari una filigrana in un dataframe di streaming, specifichi un campo di timestamp e una soglia. Quando arrivano nuovi dati, il gestore di stato tiene traccia del timestamp più recente nel campo specificato ed elabora tutti i record entro la soglia di ritardo.

Nell'esempio seguente viene applicata una soglia limite di 10 minuti a un conteggio finestrato:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In questo esempio:

  • La colonna event_time viene usata per definire una filigrana di 10 minuti e una finestra a scorrimento fisso di 5 minuti.
  • Viene raccolto un conteggio per ogni id osservato in ciascuna finestra di 5 minuti non sovrapposta.
  • Le informazioni sullo stato vengono mantenute per ogni conteggio fino a quando la fine della finestra non è più vecchia di 10 minuti rispetto all'ultima event_timeosservata.

Importante

Le soglie limite garantiscono che i record in arrivo entro la soglia specificata vengano elaborati in base alla semantica della query definita. I record in arrivo in ritardo che arrivano al di fuori della soglia specificata potrebbero comunque essere elaborati usando le metriche di query, ma questo non è garantito.

In che modo le filigrane influiscono sul tempo di elaborazione e sulla velocità effettiva?

Le watermark interagiscono con delle modalità di output per controllare quando i dati vengono scritti nel sink. Poiché le filigrane riducono la quantità totale di informazioni sullo stato da elaborare, un loro utilizzo efficace è essenziale per una velocità effettiva efficiente dello streaming con stato.

Nota

Non tutte le modalità di output sono supportate per tutte le operazioni con stato.

Filigrane e modalità di output per le aggregazioni basate su finestre

La tabella seguente illustra in dettaglio l'elaborazione delle query con aggregazione su un timestamp con una filigrana definita.

Modalità output Comportamento
Accodare Le righe vengono scritte nella tabella di destinazione dopo il superamento della soglia limite. Tutte le scritture vengono ritardate in base alla soglia di ritardo. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia.
Aggiornare Le righe vengono scritte nella tabella di destinazione man mano che i risultati vengono calcolati e possono essere aggiornate e sovrascritte man mano che arrivano nuovi dati. Lo stato dell'aggregazione precedente viene eliminato dopo il superamento della soglia.
Completo Lo stato dell'aggregazione non viene eliminato. La tabella di destinazione viene riscritta con ogni trigger.

Filigrane e output per i join di flusso su flusso

I join tra più flussi supportano solo la modalità di aggiunta e i record corrispondenti vengono scritti in ciascun batch nel momento in cui vengono individuati. Per i inner join, Databricks consiglia di impostare una soglia limite per ogni origine dati di streaming. Ciò consente di eliminare le informazioni sullo stato per i record precedenti. Senza filigrane, Structured Streaming tenta di unire ogni chiave da entrambe le parti del join a ogni attivazione.

Structured Streaming include una semantiche speciali per supportare gli outer join. La filigrana è obbligatoria per gli outer join, in quanto indica quando una chiave deve essere scritta con un valore null dopo essere rimasta senza corrispondenza. Anche se i outer join possono essere utili per registrare record che non corrispondono mai durante l'elaborazione dei dati, perché i join scrivono solo nelle tabelle come operazioni di accodamento, questi dati mancanti non vengono registrati fino al superamento della soglia di ritardo.

Controllare la soglia dei dati tardivi con più criteri limite in Structured Streaming

Quando si usano più input di Structured Streaming, è possibile impostare più filigrane per controllare le soglie di tolleranza per i dati in arrivo in ritardo. La configurazione delle filigrane consente di controllare le informazioni sullo stato e influisce sulla latenza.

Una query di streaming può avere più flussi di input uniti o collegati. Ognuno dei flussi di input può avere una soglia diversa di dati tardivi che deve essere tollerata per le operazioni con stato. Specificare queste soglie usando withWatermarks("eventTime", delay) in ognuno dei flussi di input. Di seguito è riportata una query di esempio con join tra flussi.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Durante l'esecuzione della query, Structured Streaming tiene traccia singolarmente del tempo massimo di evento visualizzato in ogni flusso di input, calcola le filigrane in base al ritardo corrispondente e sceglie una singola filigrana globale con cui usarle per le operazioni con stato. Per impostazione predefinita, il valore minimo viene scelto come filigrana globale perché impedisce l'eliminazione accidentale dei dati come troppo tardi se uno dei flussi rimane indietro rispetto agli altri (ad esempio, uno dei flussi interrompe la ricezione dei dati a causa di guasti a monte). In altre parole, la filigrana globale si sposta in modo sicuro al ritmo del flusso più lento e l'output della query viene quindi ritardato di conseguenza.

Se si desidera ottenere risultati più veloci, è possibile impostare la politica multipla di watermark per scegliere il valore massimo come watermark globale impostando il spark.sql.streaming.multipleWatermarkPolicy di configurazione SQL su max (il valore predefinito è min). In questo modo la filigrana globale si sposta al ritmo del flusso più veloce. Tuttavia, questa configurazione elimina i dati dai flussi più lenti. Databricks consiglia di usare questa configurazione con giudizio.

Applicare filigrane a operazioni distinte

L'operazione distinct è un operatore con stato che richiede filigrane per impedire la crescita dello stato non associato. Senza filigrane, Structured Streaming tenta di tenere traccia di ogni record univoco a tempo indeterminato, che può causare problemi di memoria o latenze di elaborazione aumentate.

Quando si applica distinct a un dataframe di streaming, è necessario specificare una filigrana in un campo timestamp. La filigrana controlla per quanto tempo il gestore dello stato mantiene i record per la deduplicazione. Dopo il passaggio della soglia limite, i record precedenti vengono rimossi dallo stato.

L'esempio seguente applica una filigrana a un'operazione distinct :

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

In questo esempio, i record duplicati in arrivo entro 1 ora dall'ultimo rilevamento eventTime vengono rimossi dal flusso. Le informazioni sullo stato per la deduplicazione vengono eliminate dopo il passaggio della soglia.

Importante

Se è necessario deduplicare in colonne specifiche anziché in tutte le colonne, usare dropDuplicates() o dropDuplicatesWithinWatermark() anziché distinct. Per informazioni dettagliate, vedere la sezione che segue.

Eliminare i duplicati all'interno della filigrana

In Databricks Runtime 13.3 LTS o versione successiva è possibile deduplicare i record all'interno di una soglia limite usando un identificatore univoco.

Structured Streaming offre garanzie di elaborazione esattamente una volta, ma non deduplica automaticamente i record dalle origini dati. È possibile usare dropDuplicatesWithinWatermark per deduplicare i record in qualsiasi campo specificato, consentendo di rimuovere i duplicati da un flusso anche se alcuni campi sono diversi, ad esempio l'ora dell'evento o l'ora di arrivo.

È garantito che i record duplicati che arrivano entro la filigrana specificata vengano eliminati. Questa garanzia è rigorosa in una sola direzione e anche i record duplicati che arrivano al di fuori della soglia specificata potrebbero essere eliminati. Per rimuovere tutti i duplicati, è necessario impostare la soglia di ritardo della filigrana superiore alla massima differenza di timestamp tra gli eventi duplicati.

È necessario specificare una filigrana per usare il metodo dropDuplicatesWithinWatermark, come nell'esempio seguente:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))