Optimera tillståndskänsliga frågor för strukturerad direktuppspelning
Genom att hantera mellanliggande tillståndsinformation för tillståndskänsliga frågor för strukturerad direktuppspelning kan du förhindra oväntade svarstider och produktionsproblem.
Databricks rekommenderar:
- Använd beräkningsoptimerade instanser som arbetare.
- Ange antalet shuffle-partitioner till 1–2 gånger så många kärnor i klustret.
- Ange konfigurationen
spark.sql.streaming.noDataMicroBatches.enabled
tillfalse
i SparkSession. Detta hindrar den strömmande mikrobatchmotorn från att bearbeta mikrobatch som inte innehåller data. Observera också att om du ställer in den här konfigurationen på kan det leda tillfalse
tillståndskänsliga åtgärder som utnyttjar vattenstämplar eller timeouter för bearbetning för att inte hämta datautdata förrän nya data tas emot i stället för omedelbart.
Databricks rekommenderar att du använder RocksDB med kontrollpunkter för ändringsloggar för att hantera tillståndet för tillståndskänsliga strömmar. Se Konfigurera RocksDB tillståndslagring i Azure Databricks.
Kommentar
Det går inte att ändra tillståndshanteringsschemat mellan omstarter av frågor. Om en fråga har startats med standardhanteringen kan den alltså inte ändras utan att frågan startas från grunden med en ny kontrollpunktsplats.
Arbeta med flera tillståndskänsliga operatorer i strukturerad direktuppspelning
I Databricks Runtime 13.3 LTS och senare erbjuder Azure Databricks avancerat stöd för tillståndskänsliga operatörer i strukturerade strömningsarbetsbelastningar. Nu kan du länka flera tillståndskänsliga operatorer tillsammans, vilket innebär att du kan mata utdata från en åtgärd, till exempel en fönsterad aggregering till en annan tillståndskänslig åtgärd, till exempel en koppling.
I följande exempel visas flera mönster som du kan använda.
Viktigt!
Följande begränsningar finns när du arbetar med flera tillståndskänsliga operatorer:
FlatMapGroupWithState
stöds inte.- Endast utdataläget för tillägg stöds.
Sammanlänkad tidsfönstersaggregering
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Aggregering av tidsfönster i två olika strömmar följt av stream-stream-fönsteranslutning
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Tidsintervallkoppling för dataström följt av aggregering av tidsfönster
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Ombalansering av tillstånd för strukturerad direktuppspelning
Tillståndsombalansering är aktiverat som standard för alla strömmande arbetsbelastningar i Delta Live Tables. I Databricks Runtime 11.3 LTS och senare kan du ange följande konfigurationsalternativ i Spark-klusterkonfigurationen för att aktivera tillståndsombalansering:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Ombalansering av tillstånd gynnar tillståndskänsliga pipelines för strukturerad direktuppspelning som genomgår klusterändringshändelser. Tillståndslösa strömningsåtgärder gynnas inte, oavsett om klusterstorlekarna ändras.
Kommentar
Automatisk skalning av beräkning har begränsningar för att skala ned klusterstorleken för arbetsbelastningar med strukturerad direktuppspelning. Databricks rekommenderar att du använder Delta Live Tables med förbättrad automatisk skalning för strömmande arbetsbelastningar. Se Optimera klusteranvändningen av Delta Live Tables-pipelines med förbättrad autoskalning.
Om du ändrar storlek på kluster kan tillståndsombalansering utlösas. Under ombalanseringshändelser kan mikrobatcherna ha högre svarstid när tillståndet läses in från molnlagring till de nya körarna.
Ange inledande tillstånd för mapGroupsWithState
Du kan ange ett användardefinierat inledande tillstånd för tillståndskänslig bearbetning med structured streaming med eller flatMapGroupsWithState
mapGroupsWithState
. På så sätt kan du undvika att bearbeta data när du startar en tillståndskänslig ström utan en giltig kontrollpunkt.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Exempel på användningsfall som anger ett initialt tillstånd för operatorn flatMapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Exempel på användningsfall som anger ett initialt tillstånd för operatorn mapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Testa uppdateringsfunktionen mapGroupsWithState
Med TestGroupState
API:et kan du testa tillståndsuppdateringsfunktionen som används för Dataset.groupByKey(...).mapGroupsWithState(...)
och Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
Tillståndsuppdateringsfunktionen tar det tidigare tillståndet som indata med hjälp av ett objekt av typen GroupState
. Se referensdokumentationen för Apache Spark GroupState. Till exempel:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}