Sdílet prostřednictvím


Co je stavové streamování?

Stavový dotaz strukturovaného streamování vyžaduje přírůstkové aktualizace průběžných informací o stavu, zatímco bezstavový dotaz strukturovaného streamování sleduje pouze informace o tom, které řádky byly zpracovány ze zdroje do jímky. Informace o funkcích optimalizace dostupných pro bezstavové dotazy najdete v tématu Optimalizace dotazů bezstavového streamování.

Stavové operace

Stavové operace zahrnují streamovací agregaci, streamovací dropDuplicates, propojení datových proudů a vlastní stavové aplikace.

Informace o přechodném stavu vyžadované pro stavové dotazy strukturovaného streamování můžou vést k neočekávaným problémům s latencí a produkčním prostředím, pokud jsou nesprávně nakonfigurované.

Ve službě Databricks Runtime 13.3 LTS a novějších můžete aktivovat kontrolu změn pomocí RocksDB, což snižuje dobu trvání kontrolního bodu a koncovou latenci pro úlohy strukturovaného streamování. Databricks doporučuje zapnout kontrolu změn pro všechny stavové dotazy u strukturovaného streamování. Viz Povolení vytváření kontrolních bodů protokolu změn.

Optimalizace stavových dotazů strukturovaného streamování

Správa průběžných informací o stavových dotazech strukturovaného streamování může pomoct zabránit neočekávaným problémům s latencí a produkčním prostředím.

Databricks doporučuje:

  • Používejte výpočetní instance jako pracovní uzly.
  • Nastavte počet oddílů náhodného prohazování na 1–2krát počet jader v clusteru.
  • Nastavte v SparkSession konfiguraci spark.sql.streaming.noDataMicroBatches.enabled na false. Tím zabráníte streamovacímu mikrodávkovému modulu zpracovávat mikrodávkové dávky, které neobsahují data. Všimněte si také, že nastavení této konfigurace na false může vést ke stavovým operacím, které používají vodní značky nebo časové limity zpracování, což může způsobit, že k výstupu dat nedojde okamžitě, ale až po příchodu nových dat.

Databricks doporučuje používat RocksDB s kontrolním bodem změnového protokolu ke správě stavu stavových streamů. Viz Konfigurace úložiště stavů RocksDB v Azure Databricks.

Poznámka:

Schéma správy stavu nelze mezi restartováními dotazů změnit. Pokud byl dotaz spuštěn s výchozí správou, musíte ho restartovat úplně od začátku s novým umístěním kontrolního bodu, aby se změnilo úložiště stavů.

Práce s několika stavovými operátory ve strukturovaném streamování

V Databricks Runtime 13.3 LTS a novějších nabízí Azure Databricks pokročilou podporu stavových operátorů v úlohách strukturovaného streamování. Teď můžete zřetězit více stavových operátorů, což znamená, že můžete výstup z operace, například z oknové agregace, předat do další stavové operace, například do spojení.

V Databricks Runtime 16.2 a novějších můžete použít transformWithState v úlohách s několika stavovými operátory. Viz Vytvoření vlastní stavové aplikace.

Následující příklady ukazují několik vzorů, které můžete použít.

Důležité

Při práci s několika stavovými operátory existují následující omezení:

  • Starší stavové operátory (FlatMapGroupWithState a applyInPandasWithState se nepodporují.
  • Podporován je pouze režim připojování výstupu.

Agregace zřetězených časových intervalů

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregace časového okna ve dvou různých datových proudech následovaná spojením časového okna datových proudů.

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Spojení datových proudů podle časového intervalu následované agregací v časovém okně

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Vyrovnávání stavu pro strukturované streamování

Vyrovnávání stavu je ve výchozím nastavení povolené pro všechny streamovací úlohy v deklarativních kanálech Spark Lakeflow. Ve službě Databricks Runtime 11.3 LTS a novějších můžete v konfiguraci clusteru Spark nastavit následující možnost konfigurace, abyste povolili vyrovnávání stavu:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Vyvážení stavů přináší výhody stavovým zpracovatelským kanálům strukturovaného streamování, které procházejí událostmi změny velikosti clusteru. Bezstavové streamovací operace nemají žádný prospěch bez ohledu na to, jak se mění velikosti clusteru.

Poznámka:

Automatické škálování má omezení při snižování velikosti clusteru pro úlohy strukturovaného streamování. Databricks doporučuje používat deklarativní kanály Sparku Lakeflow s vylepšeným automatickým škálováním pro úlohy streamování. Viz Optimalizace využití clusteru deklarativních kanálů Sparku Lakeflow pomocí automatického škálování.

Události změny velikosti clusteru spouští vyrovnávání stavu. Mikrodávky můžou mít při událostech vyvažování vyšší latenci, protože se stav načítá z cloudového úložiště do nových výkonných jednotek.