Optimera tillståndskänsliga frågor för strukturerad direktuppspelning

Genom att hantera mellanliggande tillståndsinformation för tillståndskänsliga frågor för strukturerad direktuppspelning kan du förhindra oväntade svarstider och produktionsproblem.

Databricks rekommenderar:

  • Använd beräkningsoptimerade instanser som arbetare.
  • Ange antalet shuffle-partitioner till 1–2 gånger så många kärnor i klustret.
  • Ange konfigurationen spark.sql.streaming.noDataMicroBatches.enabled till false i SparkSession. Detta hindrar den strömmande mikrobatchmotorn från att bearbeta mikrobatch som inte innehåller data. Observera också att om du ställer in den här konfigurationen på kan det leda till false tillståndskänsliga åtgärder som utnyttjar vattenstämplar eller timeouter för bearbetning för att inte hämta datautdata förrän nya data tas emot i stället för omedelbart.

Databricks rekommenderar att du använder RocksDB med kontrollpunkter för ändringsloggar för att hantera tillståndet för tillståndskänsliga strömmar. Se Konfigurera RocksDB tillståndslagring i Azure Databricks.

Kommentar

Det går inte att ändra tillståndshanteringsschemat mellan omstarter av frågor. Om en fråga har startats med standardhanteringen kan den alltså inte ändras utan att frågan startas från grunden med en ny kontrollpunktsplats.

Arbeta med flera tillståndskänsliga operatorer i strukturerad direktuppspelning

I Databricks Runtime 13.3 LTS och senare erbjuder Azure Databricks avancerat stöd för tillståndskänsliga operatörer i strukturerade strömningsarbetsbelastningar. Nu kan du länka flera tillståndskänsliga operatorer tillsammans, vilket innebär att du kan mata utdata från en åtgärd, till exempel en fönsterad aggregering till en annan tillståndskänslig åtgärd, till exempel en koppling.

I följande exempel visas flera mönster som du kan använda.

Viktigt!

Följande begränsningar finns när du arbetar med flera tillståndskänsliga operatorer:

  • FlatMapGroupWithState stöds inte.
  • Endast utdataläget för tillägg stöds.

Sammanlänkad tidsfönstersaggregering

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Aggregering av tidsfönster i två olika strömmar följt av stream-stream-fönsteranslutning

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Tidsintervallkoppling för dataström följt av aggregering av tidsfönster

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Ombalansering av tillstånd för strukturerad direktuppspelning

Tillståndsombalansering är aktiverat som standard för alla strömmande arbetsbelastningar i Delta Live Tables. I Databricks Runtime 11.3 LTS och senare kan du ange följande konfigurationsalternativ i Spark-klusterkonfigurationen för att aktivera tillståndsombalansering:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Ombalansering av tillstånd gynnar tillståndskänsliga pipelines för strukturerad direktuppspelning som genomgår klusterändringshändelser. Tillståndslösa strömningsåtgärder gynnas inte, oavsett om klusterstorlekarna ändras.

Kommentar

Automatisk skalning av beräkning har begränsningar för att skala ned klusterstorleken för arbetsbelastningar med strukturerad direktuppspelning. Databricks rekommenderar att du använder Delta Live Tables med förbättrad automatisk skalning för strömmande arbetsbelastningar. Se Optimera klusteranvändningen av Delta Live Tables-pipelines med förbättrad autoskalning.

Om du ändrar storlek på kluster kan tillståndsombalansering utlösas. Under ombalanseringshändelser kan mikrobatcherna ha högre svarstid när tillståndet läses in från molnlagring till de nya körarna.

Ange inledande tillstånd för mapGroupsWithState

Du kan ange ett användardefinierat inledande tillstånd för tillståndskänslig bearbetning med structured streaming med eller flatMapGroupsWithStatemapGroupsWithState. På så sätt kan du undvika att bearbeta data när du startar en tillståndskänslig ström utan en giltig kontrollpunkt.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exempel på användningsfall som anger ett initialt tillstånd för operatorn flatMapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exempel på användningsfall som anger ett initialt tillstånd för operatorn mapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testa uppdateringsfunktionen mapGroupsWithState

Med TestGroupState API:et kan du testa tillståndsuppdateringsfunktionen som används för Dataset.groupByKey(...).mapGroupsWithState(...) och Dataset.groupByKey(...).flatMapGroupsWithState(...).

Tillståndsuppdateringsfunktionen tar det tidigare tillståndet som indata med hjälp av ett objekt av typen GroupState. Se referensdokumentationen för Apache Spark GroupState. Till exempel:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}