Wat is stateful streaming?
Voor een stateful Structured Streaming-query zijn incrementele updates van tussenliggende statusgegevens vereist, terwijl een stateless Structured Streaming-query alleen informatie bijhoudt over welke rijen van de bron naar de sink zijn verwerkt.
Stateful bewerkingen omvatten streamingaggregatie, streaming, stream-stream dropDuplicates
joins, mapGroupsWithState
en flatMapGroupsWithState
.
De tussenliggende statusinformatie die is vereist voor stateful structured streaming-query's kan leiden tot onverwachte latentie en productieproblemen als deze niet goed zijn geconfigureerd.
In Databricks Runtime 13.3 LTS en hoger kunt u changelog-controlepunten met RocksDB inschakelen om de duur van het controlepunt en de end-to-end-latentie voor structured streaming-workloads te verlagen. Databricks raadt aan controlepunten in het wijzigingenlogboek in te schakelen voor alle stateful query's voor gestructureerd streamen. Zie Controlepunten voor wijzigingenlogboek inschakelen.
Stateful structured streaming-query's optimaliseren
Het beheren van de informatie over de tussenliggende status van stateful structured streaming-query's kan helpen bij het voorkomen van onverwachte latentie en productieproblemen.
Databricks raadt het volgende aan:
- Gebruik voor rekenkracht geoptimaliseerde exemplaren als werkrollen.
- Stel het aantal willekeurige partities in op 1-2 keer aantal kernen in het cluster.
- Stel de
spark.sql.streaming.noDataMicroBatches.enabled
configuratiefalse
in op in SparkSession. Hiermee voorkomt u dat de streaming-microbatch-engine microbatches verwerkt die geen gegevens bevatten. Houd er ook rekening mee dat het instellen van deze configuratiefalse
kan leiden tot stateful bewerkingen die gebruikmaken van time-outs voor watermerken of verwerkingstime-outs om geen gegevensuitvoer te krijgen totdat nieuwe gegevens binnenkomen in plaats van onmiddellijk.
Databricks raadt het gebruik van RocksDB aan met controlepunten voor wijzigingenlogboeken om de status voor stateful stromen te beheren. Zie Opslag van RocksDB-status in Azure Databricks configureren.
Notitie
Het statusbeheerschema kan niet worden gewijzigd tussen het opnieuw opstarten van de query. Als een query is gestart met het standaardbeheer, kan deze dus niet worden gewijzigd zonder de query helemaal opnieuw te starten met een nieuwe controlepuntlocatie.
Werken met meerdere stateful operators in Structured Streaming
In Databricks Runtime 13.3 LTS en hoger biedt Azure Databricks geavanceerde ondersteuning voor stateful operators in structured streaming-workloads. U kunt nu meerdere stateful operators samenvoegen, wat betekent dat u de uitvoer van een bewerking, zoals een gevensterde aggregatie, kunt doorvoeren naar een andere stateful bewerking, zoals een join.
In de volgende voorbeelden ziet u verschillende patronen die u kunt gebruiken.
Belangrijk
De volgende beperkingen gelden voor het werken met meerdere stateful operators:
FlatMapGroupWithState
wordt niet ondersteund.- Alleen de uitvoermodus voor toevoegen wordt ondersteund.
Aggregatie van geketende tijdvensters
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Tijdvensteraggregatie in twee verschillende streams gevolgd door stream-stream window join
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Tijdsinterval voor streamstreaming gevolgd door tijdvensteraggregatie
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Statusherverdeling voor gestructureerd streamen
Statusherverdeling is standaard ingeschakeld voor alle streamingworkloads in Delta Live Tables. In Databricks Runtime 11.3 LTS en hoger kunt u de volgende configuratieoptie instellen in de Configuratie van het Spark-cluster om statusherverdeling mogelijk te maken:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Statusherverdeling biedt stateful Structured Streaming-pijplijnen die het formaat van het cluster wijzigen. Staatloze streamingbewerkingen profiteren niet, ongeacht het wijzigen van clustergrootten.
Notitie
Automatisch schalen van berekeningen heeft beperkingen bij het omlaag schalen van clustergrootte voor structured streaming-workloads. Databricks raadt het gebruik van Delta Live Tables aan met verbeterde automatische schaalaanpassing voor streamingworkloads. Zie Het clustergebruik van Delta Live Tables-pijplijnen optimaliseren met verbeterde automatische schaalaanpassing.
Het wijzigen van het formaat van clusters zorgt ervoor dat statusherverdeling wordt geactiveerd. Tijdens het opnieuw verdelen van gebeurtenissen kunnen microbatches een hogere latentie hebben wanneer de status van cloudopslag naar de nieuwe uitvoerders wordt geladen.
Geef de initiële status op voor mapGroupsWithState
U kunt een door de gebruiker gedefinieerde initiële status opgeven voor structured streaming stateful verwerking met behulp van flatMapGroupsWithState
of mapGroupsWithState
. Hierdoor kunt u voorkomen dat gegevens opnieuw worden verwerkt bij het starten van een stateful stream zonder een geldig controlepunt.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Voorbeeld van use case die een initiële status aangeeft aan de flatMapGroupsWithState
operator:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Voorbeeld van use case die een initiële status aangeeft aan de mapGroupsWithState
operator:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState
De updatefunctie testen
Met TestGroupState
de API kunt u de functie voor statusupdates testen die wordt gebruikt voor Dataset.groupByKey(...).mapGroupsWithState(...)
en Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
De functie statusupdate heeft de vorige status als invoer met behulp van een object van het type GroupState
. Raadpleeg de referentiedocumentatie voor Apache Spark GroupState. Voorbeeld:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}