Delen via


Wat is stateful streaming?

Op deze pagina worden stateful structured streaming-queries uitgelegd, waaronder stateful bewerkingen, optimalisatieaanbevelingen, het combineren van meerdere stateful operators en herverdeling van statussen.

Voor een stateful Structured Streaming-query zijn incrementele updates van tussenliggende statusgegevens vereist, terwijl een stateless Structured Streaming-query alleen informatie bijhoudt over welke rijen van de bron naar de sink zijn verwerkt. Zie Stateless Streaming-query's optimaliseren voor optimalisatiefuncties die beschikbaar zijn voor stateless query's.

Stateful bewerkingen

Stateful bewerkingen omvatten streamingaggregatie, distinct, dropDuplicates, stream-stream-verbindingen en aangepaste stateful toepassingen.

De tussenliggende statusinformatie die is vereist voor stateful Structured Streaming-query's kan leiden tot onverwachte latentie en productieproblemen als deze onjuist zijn geconfigureerd.

In Databricks Runtime 13.3 LTS of hoger kunt u changelog-checkpointing met RocksDB inschakelen om de duur van het controlepunt en de end-to-end-latentie voor Structured Streaming-workloads te verlagen. Databricks raadt aan om controlepunten in het wijzigingenlogboek in te schakelen voor alle toestandsafhankelijke query's bij gestructureerde stream-verwerking. Zie Changelog-checkpointing inschakelen.

De stateful Structured Streaming-queries optimaliseren

Databricks raadt het volgende aan voor stateful Structured Streaming-query's:

  • Gebruik rekengeoptimaliseerde instanties als werkers.
  • Stel het aantal shuffle-partities in op tussen 1 en 2 keer het aantal kernen in het cluster.
  • Stel de spark.sql.streaming.noDataMicroBatches.enabled-configuratie in op false in SparkSession. Hiermee voorkomt u dat de streaming-microbatch-engine microbatches verwerkt die geen gegevens bevatten. Als u deze configuratie instelt op false, kan dit er ook toe leiden dat stateful bewerkingen die gebruikmaken van watermerken of verwerkingstijd-time-outs geen gegevensuitvoer produceren totdat er nieuwe gegevens binnenkomen in plaats van onmiddellijk.

Databricks raadt aan RocksDB te gebruiken met wijzigingslogboek-controlepunten voor het beheren van de status van stateful streams. Zie Opslag van RocksDB-status in Azure Databricks configureren.

Notitie

Het statusbeheerschema kan niet worden gewijzigd tussen het opnieuw opstarten van query's. Als een query is gestart met het standaardbeheer, moet u deze opnieuw starten met een nieuwe controlepuntlocatie om het statusarchief te wijzigen.

Werken met meerdere stateful operators in Structured Streaming

In Databricks Runtime 13.3 LTS of hoger biedt Azure Databricks geavanceerde ondersteuning voor stateful operators in structured streaming-workloads. U kunt meerdere stateful operators aan elkaar koppelen, wat betekent dat u de uitvoer van een bewerking, zoals een aggregatie met vensters, kunt doorvoeren naar een andere stateful bewerking, zoals een join.

In Databricks Runtime 16.2 of hoger kunt u in workloads met meerdere stateful operators gebruiken transformWithState . Zie Een custom stateful toepassing bouwen.

In de volgende voorbeelden ziet u verschillende patronen die u kunt gebruiken.

Belangrijk

De volgende beperkingen gelden voor het werken met meerdere stateful operators:

  • Verouderde aangepaste stateful operators (FlatMapGroupWithState en applyInPandasWithState) worden niet ondersteund.
  • Alleen de uitvoermodus voor toevoegen wordt ondersteund.

Aggregatie van geketende tijdvensters

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Tijdvensteraggregatie in twee verschillende stroomsegmenten gevolgd door een stroom-stroom venstersamenvoeging

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Tijdintervalkoppeling van streams gevolgd door tijdvensteraggregatie

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Statusherverdeling voor gestructureerd streamen

Statusherverdeling is standaard ingeschakeld voor alle streamingworkloads in Lakeflow Spark-declaratieve pijplijnen. In Databricks Runtime 11.3 LTS of hoger kunt u de volgende configuratieoptie instellen in de Spark-clusterconfiguratie om statusherverdeling mogelijk te maken:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

State-herverdeling komt ten goede aan stateful Structured Streaming-pijplijnen die clusteraanpassingen ondergaan. Staatloze streamingbewerkingen profiteren niet, ongeacht het wijzigen van clustergrootten.

Notitie

Het automatisch schalen heeft beperkingen bij het verkleinen van de clustergrootte voor Structured Streaming-workloads. Databricks raadt aan om declaratieve pijplijnen van Lakeflow Spark te gebruiken met verbeterde automatische schaalaanpassing voor streamingworkloads. Zie Het clustergebruik van declaratieve Pijplijnen van Lakeflow Spark optimaliseren met automatisch schalen.

Clusterresizingsgebeurtenissen activeren het herbalanceren van de status. Microbatches kunnen een hogere latentie hebben tijdens herverdelingsgebeurtenissen wanneer de status van cloudopslag naar de nieuwe uitvoerders wordt geladen.