Nota
L'accesso a questa pagina richiede l'autorizzazione. Puoi provare ad accedere o a cambiare directory.
L'accesso a questa pagina richiede l'autorizzazione. Puoi provare a cambiare directory.
Questa pagina illustra le query di streaming strutturato con stato, incluse operazioni stateful, raccomandazioni di ottimizzazione, catena di più operatori stateful e ribilanciamento dello stato.
Una query di Structured Streaming con stato richiede aggiornamenti incrementali alle informazioni sullo stato intermedio, mentre una query di Structured Streaming senza stato tiene traccia solo delle informazioni riguardo alle righe elaborate dalla sorgente al sink. Per le funzionalità di ottimizzazione disponibili per le query senza stato, vedere Ottimizzare le query di streaming senza stato.
Operazioni con stato
Le operazioni con stato includono aggregazioni di streaming, distinct, dropDuplicates, join su flussi e applicazioni con stato personalizzate.
Le necessarie informazioni sullo stato intermedio per le query Stateful Structured Streaming possono causare problemi di latenza e produzione imprevedibili se non configurate correttamente.
In Databricks Runtime 13.3 LTS o versioni successive è possibile abilitare il checkpoint del registro delle modifiche con RocksDB per ridurre la durata del checkpoint e la latenza end-to-end per i carichi di lavoro di Structured Streaming. Databricks raccomanda di abilitare il checkpoint del changelog per tutte le query con stato di Structured Streaming. Vedere Abilitare il checkpoint del log delle modifiche.
Ottimizzare le query Structured Streaming con stato persistente
Databricks consiglia quanto segue per le query Structured Streaming stateful:
- Usare le istanze ottimizzate per il calcolo come ruoli di lavoro.
- Impostare il numero di partizioni di shuffle su uno o due volte il numero di core nel cluster.
- Impostare la configurazione
spark.sql.streaming.noDataMicroBatches.enabledsufalsein SparkSession. Ciò impedisce al motore di micro batch di streaming di elaborare micro batch che non contengono dati. L'impostazione di questa configurazione sufalsepotrebbe comportare anche operazioni con stato che usano filigrane o timeout di elaborazione che non ricevono l'output dei dati finché non arrivano nuovi dati anziché immediatamente.
Databricks consiglia di usare RocksDB con checkpoint del log delle modifiche per gestire lo stato per i flussi con stato. Vedere Configurare l'archivio di stato RocksDB su Azure Databricks.
Nota
Lo schema di gestione dello stato non può essere modificato tra i riavvii della query. Se una query è stata avviata con la gestione predefinita, è necessario riavviarla dall'inizio con una nuova posizione del checkpoint per cambiare l'archivio di stato.
Usare più operatori con stato in Structured Streaming
In Databricks Runtime 13.3 LTS o versione successiva, Azure Databricks offre supporto avanzato per gli operatori con stato nei carichi di lavoro di streaming strutturato. È possibile concatenare più operatori con stato, ovvero utilizzare l'output di un'operazione, come un'aggregazione finestrata, con un'altra operazione con stato, ad esempio un join.
In Databricks Runtime 16.2 o versione successiva è possibile usare transformWithState nei carichi di lavoro con più operatori con stato. Vedi Crea un'applicazione personalizzata con stato.
Gli esempi seguenti illustrano diversi modelli che è possibile usare.
Importante
Quando si usano più operatori con stato, esistono le limitazioni seguenti:
- Gli operatori con stato personalizzati legacy (
FlatMapGroupWithStateeapplyInPandasWithState) non sono supportati. - È supportata solo la modalità di output append.
Aggregazione concatenata delle finestre temporali
Pitone
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Linguaggio di programmazione Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Aggregazione per finestra temporale in due flussi diversi seguita da join per finestra tra flussi
Pitone
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Linguaggio di programmazione Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Join dell'intervallo di tempo del flusso seguito dall'aggregazione della finestra temporale
Pitone
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Linguaggio di programmazione Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Ribilanciamento dello stato per Structured Streaming
Il ribilanciamento dello stato è abilitato per impostazione predefinita per tutti i carichi di lavoro di streaming nelle pipeline dichiarative di Lakeflow Spark. In Databricks Runtime 11.3 LTS o versione successiva è possibile impostare l'opzione di configurazione seguente nella configurazione del cluster Spark per abilitare il ribilanciamento dello stato:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Il ribilanciamento dello stato offre vantaggi per le pipeline Structured Streaming con stato che subiscono eventi di ridimensionamento del cluster. Le operazioni di streaming senza stato non traggono vantaggio, indipendentemente dalla modifica delle dimensioni del cluster.
Nota
La scalabilità automatica del calcolo ha dei limiti quando si riduce la dimensione del cluster per carichi di lavoro di streaming strutturati. Databricks consiglia di usare le Pipeline Dichiarative Spark di Lakeflow con una scalabilità automatica avanzata per i carichi di lavoro di streaming. Vedere Ottimizzare l'utilizzo del cluster delle pipeline dichiarative di Lakeflow Spark con scalabilità automatica.
Gli eventi di ridimensionamento del cluster innescano il ribilanciamento dello stato. I micro batch potrebbero avere una latenza maggiore durante gli eventi di ribilanciamento quando lo stato viene caricato dall'archiviazione cloud ai nuovi executor.