Condividi tramite


Che cos'è lo streaming con stato?

Una query Structured Streaming con stato richiede aggiornamenti incrementali alle informazioni sullo stato intermedio, mentre una query Structured Streaming senza stato tiene traccia solo delle informazioni sulle righe elaborate dall'origine al sink.

Le operazioni con stato includono aggregazioni di streaming dropDuplicates, streaming, join stream-stream, mapGroupsWithState, e flatMapGroupsWithState.

Le informazioni di stato intermedie delle query di Structured Streaming con stato possono causare problemi di latenza e produzione imprevisti se non configurate correttamente.

In Databricks Runtime 13.3 LTS e versioni successive, è possibile abilitare il checkpoint del log delle modifiche con RocksDB per ridurre la durata del checkpoint e la latenza end-to-end per i carichi di lavoro di Structured Streaming. Databricks raccomanda di abilitare il checkpoint del log delle modifiche per tutte le query con stato di Structured Streaming. Vedere Abilitare il checkpoint del log delle modifiche.

Ottimizzare le query con stato Structured Streaming

La gestione delle informazioni di stato intermedie delle query di Structured Streaming con stato può contribuire ad evitare problemi di latenza e produzione imprevisti.

Databricks consiglia di:

  • Usare le istanze ottimizzate per il calcolo come ruoli di lavoro.
  • Impostare il numero di partizioni casuali su 1-2 volte il numero di core nel cluster.
  • Impostare la spark.sql.streaming.noDataMicroBatches.enabled configurazione su false in SparkSession. Ciò impedisce al modulo micro batch di streaming di elaborare micro batch che non contengono dati. Si noti anche che l'impostazione di questa configurazione su false potrebbe comportare operazioni con stato che sfruttano limiti o timeout di elaborazione per non ottenere l'output dei dati fino a quando non arrivano nuovi dati anziché immediatamente.

Databricks consiglia di usare RocksDB con checkpoint del log delle modifiche per gestire lo stato per i flussi con stato. Vedere Configurare l'archivio di stato RocksDB su Azure Databricks.

Nota

Non è possibile modificare lo schema di gestione dello stato tra i riavvii della query. Ovvero, se una query è stata avviata con la gestione predefinita, non può essere modificata senza avviare la query da zero con un nuovo percorso del checkpoint.

Usare più operatori con stato in Structured Streaming

In Databricks Runtime 13.3 LTS e versioni successive Azure Databricks offre supporto avanzato per gli operatori con stato nei carichi di lavoro Structured Streaming. È ora possibile concatenare più operatori con stato, vale a dire che è possibile inserire l'output di un'operazione, ad esempio un'aggregazione finestrata a un'altra operazione con stato, ad esempio un join.

Gli esempi seguenti illustrano diversi modelli che è possibile usare.

Importante

Quando si usano più operatori con stato, esistono le limitazioni seguenti:

  • FlatMapGroupWithState non è supportata.
  • È supportata solo la modalità di output Append.

Aggregazione dell'intervallo di tempo concatenato

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Aggregazione dell'intervallo di tempo in due flussi diversi seguiti dal join di finestre stream-stream

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Join dell'intervallo di tempo stream-stram seguito dall'aggregazione dell'intervallo di tempo

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Ribilanciamento dello stato per Structured Streaming

Il ribilanciamento dello stato è abilitato per impostazione predefinita per tutti i carichi di lavoro di streaming nelle tabelle Delta Live. In Databricks Runtime 11.3 LTS e versioni successive è possibile impostare l'opzione di configurazione seguente nella configurazione del cluster Spark per abilitare il ribilanciamento dello stato:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Il ribilanciamento dello stato offre vantaggi per le pipeline Structured Streaming con stato che subiscono eventi di ridimensionamento del cluster. Le operazioni di streaming senza stato non traggono vantaggio, indipendentemente dalla modifica delle dimensioni del cluster.

Nota

La scalabilità automatica del calcolo ha dei limiti quando si riduce la dimensione del cluster per carichi di lavoro di streaming strutturati. Databricks consiglia di usare tabelle live Delta con scalabilità automatica avanzata per i carichi di lavoro di streaming. Vedere Ottimizzare l'utilizzo del cluster delle pipeline di tabelle live Delta con scalabilità automatica avanzata.

Il ridimensionamento del cluster causa il ribilanciamento dello stato. Durante gli eventi di ribilanciamento, i micro batch potrebbero avere una latenza maggiore quando lo stato viene caricato dalla memorizzazione cloud ai nuovi executor.

Specificare lo stato iniziale per mapGroupsWithState

È possibile specificare uno stato iniziale definito dall'utente per l'elaborazione con stato Structured Streaming usando flatMapGroupsWithStateo mapGroupsWithState. Ciò consente di evitare di rielaborare i dati quando si avvia un flusso con stato senza un checkpoint valido.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Esempio di caso d'uso che specifica uno stato iniziale per l'flatMapGroupsWithStateoperatore:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Esempio di caso d'uso che specifica uno stato iniziale per l'mapGroupsWithStateoperatore:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Test della mapGroupsWithState funzione di aggiornamento

L'TestGroupStateAPI consente di testare la funzione di aggiornamento dello stato usata per Dataset.groupByKey(...).mapGroupsWithState(...) e Dataset.groupByKey(...).flatMapGroupsWithState(...).

La funzione di aggiornamento dello stato accetta lo stato precedente come input usando un oggetto di tipo GroupState. Vedere la documentazione di riferimento di Apache Spark GroupState. Ad esempio:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}