Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Viktig
Den här funktionen finns i offentlig förhandsversion i Databricks Runtime 16.2 och senare.
Du kan skapa strömmande program med hjälp av anpassade tillståndskänsliga operatorer för att implementera lösningar med låg latens och nära realtid som använder godtycklig tillståndskänslig logik. Anpassade tillståndskänsliga operatorer låser upp nya användningsfall och mönster som inte är tillgängliga via traditionell bearbetning av strukturerad direktuppspelning.
Anteckning
Databricks rekommenderar att du använder inbyggd Structured Streaming-funktionalitet för tillståndskänsliga operationer som aggregeringar, deduplicering och strömningssammanslagningar där dessa stöds. Se Vad är tillståndskänslig strömning?.
Databricks rekommenderar att du använder transformWithState över äldre operatorer för godtyckliga tillståndstransformeringar. Dokumentation om äldre operatorer för flatMapGroupsWithState och mapGroupsWithState finns i Äldre godtyckliga tillståndskänsliga operatorer.
Krav
transformWithState-operatorn och de relaterade API:erna och klasserna har följande krav:
- Tillgänglig i Databricks Runtime 16.2 och senare.
- Beräkning måste använda dedikerat eller icke-isoleringsläge, förutom att standardåtkomstläget stöds för Python (
transformWithStateInPandas) i Databricks Runtime 16.3 och senare och för Scala (transformWithState) i Databricks Runtime 17.3 och senare. - Du måste använda RocksDB-tillståndslagerleverantören. Databricks rekommenderar att du aktiverar RocksDB som en del av beräkningskonfigurationen.
Anteckning
Om du vill aktivera RocksDB-tillståndslagerprovidern för den aktuella sessionen kör du följande:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Vad är transformWithState?
Operatorn transformWithState tillämpar en anpassad tillståndskänslig processor på en fråga för strukturerad direktuppspelning. Du måste implementera en anpassad tillståndskänslig processor för att kunna använda transformWithState. Strukturerad direktuppspelning innehåller API:er för att skapa en tillståndskänslig processor med Python, Scala eller Java.
Du använder transformWithState för att tillämpa anpassad logik på en grupperingsnyckel för poster som bearbetas stegvis med strukturerad direktuppspelning. Följande beskriver designen på hög nivå:
- Definiera en eller flera tillståndsvariabler.
- Tillståndsinformation sparas för varje grupperingsnyckel och kan nås för varje tillståndsvariabel enligt användardefinierad logik.
- För varje bearbetad mikrogrupp är alla poster för den givna nyckeln tillgängliga som en iterator.
- Använd inbyggda kontroller för att styra när och hur poster avges baserat på timers och användardefinierade villkor.
- Tillståndsvärden har stöd för enskilda TTL-definitioner (time-to-live), vilket ger flexibilitet när det gäller att hantera tillståndsförfallodatum och tillståndsstorlek.
Eftersom transformWithState stöder schemautveckling i tillståndsarkivet kan du iterera och uppdatera dina produktionsprogram utan att förlora historisk tillståndsinformation eller behöva bearbeta poster igen, vilket ger flexibilitet för utveckling och enkelt underhåll. Se även Schemautveckling i tillståndslagret.
Viktig
PySpark använder operatorn transformWithStateInPandas i stället för transformWithState. Azure Databricks-dokumentationen använder transformWithState för att beskriva funktioner för både Python- och Scala-implementeringar.
Scala- och Python-implementeringarna av transformWithState och relaterade API:er skiljer sig åt på grund av språkspecifika egenskaper men har samma funktioner. Se språkspecifika exempel och API-dokumentation för önskat programmeringsspråk.
Inbyggda bearbetningshandtag
Du implementerar kärnlogik för ditt anpassade tillståndskänsliga program genom att implementera -hanterare med hjälp av inbyggda hanterar.
- Hantera ger metoder för att interagera med tillståndsvärden och timers, bearbeta inkommande poster och skicka poster.
- Hanterare definierar din anpassade händelsedrivna logik.
Handtag för varje tillståndstyp implementeras baserat på den underliggande datastrukturen, men var och en innehåller funktioner för att hämta, placera, uppdatera och ta bort poster.
Hanterare implementeras baserat på antingen händelser som observerats i indataposter eller timers med hjälp av följande semantik:
- Definiera en hanterare med hjälp av metoden
handleInputRowsför att styra hur data bearbetas, tillståndet uppdateras och poster genereras för varje mikrobatch med poster som bearbetas för grupperingsnyckeln. Se att hantera indatarader. - Definiera en hanterare med hjälp av metoden
handleExpiredTimerför att använda tidsbaserade tröskelvärden för att köra logik oavsett om ytterligare poster bearbetas för grupperingsnyckeln eller inte. Se Programmets tidsinställda händelser.
Följande tabell har en jämförelse av funktionella beteenden som stöds av dessa hanterare:
| Uppförande | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Hämta, placera, uppdatera eller rensa tillståndsvärden | Ja | Ja |
| Skapa eller ta bort en timer | Ja | Ja |
| Generera poster | Ja | Ja |
| Iterera över dataposter i det aktuella mikropartiet | Ja | Nej |
| Utlösarlogik baserat på förfluten tid | Nej | Ja |
Du kan kombinera handleInputRows och handleExpiredTimer för att implementera komplex logik efter behov.
Du kan till exempel implementera ett program som använder handleInputRows för att uppdatera tillståndsvärden för varje mikrobatch och ange en timer på 10 sekunder i framtiden. Om inga ytterligare poster bearbetas kan du använda handleExpiredTimer för att generera aktuella värden i tillståndslagret. Om nya poster bearbetas för grupperingsnyckeln kan du rensa den befintliga timern och ställa in en ny timer.
Anpassade tillståndstyper
Du kan implementera flera tillståndsobjekt i en enda tillståndskänslig operator. Namnen som du ger till varje tillståndsobjekt finns kvar i tillståndsarkivet, som du kan komma åt med tillståndsarkivläsaren. Om ditt tillståndsobjekt använder en StructTypeanger du namn för varje fält i structen när schemat skickas. Dessa namn visas också när du läser tillståndslagringen. Se läsa information om status för strukturerad direktuppspelning.
Funktionerna som tillhandahålls av inbyggda klasser och operatorer är avsedda att ge flexibilitet och utökningsbarhet, och implementeringsalternativ bör informeras av den fullständiga logik som programmet behöver köra. Du kan till exempel implementera nästan identisk logik med hjälp av en ValueState grupperad efter fält user_id och session_id eller en MapState grupperad efter user_id där session_id är nyckeln för MapState. I det här fallet kan en MapState vara den föredragna implementeringen om logiken behöver utvärdera villkor över flera session_ids.
I följande avsnitt beskrivs de tillståndstyper som stöds av transformWithState.
ValueState
För varje grupperingsnyckel finns det ett associerat värde.
Ett värde kan innehålla komplexa typer, till exempel en struct eller en tupl. När du uppdaterar en ValueStateimplementerar du logik för att ersätta hela värdet. TTL för ett värdetillstånd återställs när värdet uppdateras men återställs inte om en källnyckel som matchar en ValueState bearbetas utan att den lagrade ValueStateuppdateras.
ListState
För varje grupperingsnyckel finns det en associerad lista.
Ett listtillstånd är en samling värden som var och en kan innehålla komplexa typer. Varje värde i en lista har sin egen TTL. Du kan lägga till objekt i en lista genom att lägga till enskilda objekt, lägga till en lista med objekt eller skriva över hela listan med en put. Endast put-åtgärden anses vara en uppdatering för återställning av TTL.
MapState
För varje grupperingsnyckel finns det en associerad karta. Maps är den funktionella motsvarigheten i Apache Spark till en Python-dictionary.
Viktig
Grupperingsnycklar beskriver de fält som anges i GROUP BY-satsen i frågan Strukturerad direktuppspelning. Mappobjekt innehåller ett godtyckligt antal nyckel-värdepar för en grupperingsnyckel.
Om du till exempel grupperar efter user_id och vill definiera en karta för varje session_idär din grupperingsnyckel user_id och nyckeln på kartan är session_id.
Ett karttillstånd är en samling distinkta nycklar, där varje nyckel är kopplad till ett värde som kan inkludera komplexa typer. Varje nyckel/värde-par på en karta har sin egen TTL. Du kan uppdatera värdet för en specifik nyckel eller ta bort en nyckel och dess värde. Du kan returnera ett enskilt värde med hjälp av dess nyckel, visa alla nycklar, visa alla värden eller returnera en iterator för att arbeta med den fullständiga uppsättningen nyckel/värde-par på kartan.
Initiera en anpassad tillståndsvariabel
När du initierar din StatefulProcessorskapar du en lokal variabel för varje tillståndsobjekt som gör att du kan interagera med tillståndsobjekt i din anpassade logik. Tillståndsvariabler definieras och initieras genom att åsidosätta den inbyggda init-metoden i klassen StatefulProcessor.
Du definierar en godtycklig mängd tillståndsobjekt med hjälp av metoderna getValueState, getListStateoch getMapState när du initierar StatefulProcessor.
Varje tillståndsobjekt måste ha följande:
- Ett unikt namn
- Ett schema har angetts
- I Python specificeras schemat explicit.
- I Scala skickar du en
Encoderför att ange tillståndsschema.
Du kan också ange en valfri TTL-varaktighet (time to live) i millisekunder. Om du implementerar ett karttillstånd måste du ange en separat schemadefinition för kartnycklarna och värdena.
Anteckning
Logik för hur tillståndsinformation efterfrågas, uppdateras och genereras hanteras separat. Se Använd dina tillståndsvariabler.
Exempel på tillståndskänsligt program
Följande visar den grundläggande syntaxen för att definiera och använda en anpassad tillståndskänslig processor med transformWithState, inklusive exempeltillståndsvariabler för varje typ som stöds. Fler exempel finns i Exempel på tillståndskänsliga program.
Anteckning
Python använder tupplar för alla interaktioner med tillståndsvärden. Det innebär att Python-kod ska skicka värden med tupplar när du använder åtgärder som put och update och förväntas hantera tupplar när du använder get.
Om schemat för ditt värdetillstånd till exempel bara är ett heltal implementerar du kod som följande:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Detta gäller även för objekt i en ListState eller värden i en MapState.
python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
PySpark innehåller klassen StatefulProcessorHandle för att ge åtkomst till funktioner som styr hur din användardefinierade Python-kod interagerar med tillståndsinformation. Du måste alltid importera och skicka StatefulProcessorHandle till variabeln handle när du initierar en StatefulProcessor.
Variabeln handle kopplar den lokala variabeln i Python-klassen till tillståndsvariabeln.
Anteckning
Scala använder metoden getHandle.
Ange inledande tillstånd
Du kan också ange ett initialt tillstånd att använda med den första mikrobatchen. Detta kan vara användbart när du migrerar ett befintligt arbetsflöde till ett nytt anpassat program, uppgraderar en tillståndskänslig operatör för att ändra schemat eller logiken eller reparerar ett fel som inte kan repareras automatiskt och kräver manuella åtgärder.
Anteckning
Använd tillståndsarkivläsaren för att fråga efter tillståndsinformation från en befintlig kontrollpunkt. Se läsa information om status för strukturerad direktuppspelning.
Om du konverterar en befintlig Delta-tabell till ett tillståndskänsligt program läser du tabellen med hjälp av spark.read.table("table_name") och skickar den resulterande DataFrame.If you are converting an existing Delta table to a stateful application, read the table using spark.read.table("table_name") and pass the resulting DataFrame. Du kan också välja eller ändra fält så att de överensstämmer med ditt nya tillståndskänsliga program.
Du anger ett initialt tillstånd med hjälp av en DataFrame med samma grupperingsnyckelschema som indataraderna.
Anteckning
Python använder handleInitialState för att ange det inledande tillståndet när du definierar en StatefulProcessor. Scala använder den distinkta klassen StatefulProcessorWithInitialState.
Använd dina tillståndsvariabler
Tillståndsobjekt som stöds tillhandahåller metoder för att hämta tillstånd, uppdatera befintlig tillståndsinformation eller rensa det aktuella tillståndet. Varje tillståndstyp som stöds har en unik implementering av metoder som motsvarar den datastruktur som implementeras.
Varje grupperingsnyckel som observeras har dedikerad tillståndsinformation.
- Data genereras baserat på den logik som du implementerar och med hjälp av det utdataschema som du anger. Se Emit-poster.
- Du kan komma åt värden i tillståndsarkivet, med hjälp av läsaren
statestore. Den här läsaren har batchfunktioner och är inte avsedd för arbetsbelastningar med låg latens. Se läsa information om status för strukturerad direktuppspelning. - Logik som anges med
handleInputRowsutlöses endast om poster för nyckeln finns i en mikrobatch. Se att hantera indatarader. - Använd
handleExpiredTimerför att implementera tidsbaserad logik som inte är beroende av att observera poster för att aktiveras. Se Programmets tidsinställda händelser.
Anteckning
Tillståndsobjekt isoleras genom gruppering av nycklar med följande konsekvenser:
- Tillståndsvärden kan inte påverkas av poster som är associerade med en annan grupperingsnyckel.
- Du kan inte implementera logik som är beroende av att jämföra värden eller uppdatera tillstånd mellan grupperingsnycklar.
Du kan jämföra värden i en grupperingsnyckel. Använd en MapState för att implementera logik med en andra nyckel som din anpassade logik kan använda. Om du till exempel grupperar efter user_id och nyckelar din MapState med hjälp av IP-adressen kan du implementera logik som spårar samtidiga användarsessioner.
Avancerade överväganden för att arbeta med state
När du skriver till en tillståndsvariabel utlöses en skrivning till RocksDB. För optimerad prestanda rekommenderar Databricks att du bearbetar alla värden i iteratorn för en viss nyckel och genomför uppdateringar i en enda skrivning när det är möjligt.
Anteckning
Tillståndsuppdateringar är feltoleranta. Om en aktivitet kraschar innan en mikrobatch har slutfört bearbetningen används värdet från den senaste lyckade mikrobatchen vid återförsök.
Tillståndsvärden har inga inbyggda standardvärden. Om logiken kräver att du läser befintlig tillståndsinformation använder du metoden exists när du implementerar logiken.
Anteckning
MapState variabler har ytterligare funktioner för att söka efter enskilda nycklar eller lista alla nycklar för att implementera logik för null-tillstånd.
Generera poster
Användardefinierad logik styr hur transformWithState genererar poster. Poster utsläpps för varje grupperingsnyckel.
Skräddarsydda tillståndsbaserade program gör inga förutsättningar om hur tillståndsinformation används vid fastställandet av hur poster ska utsändas, och antalet returnerade poster för ett givet villkor kan vara inget, ett eller flera.
Du implementerar logik för att emittera poster med antingen handleInputRows eller handleExpiredTimer. Se Hantera indatarader och Program-tidsindelade händelser.
Anteckning
Du kan implementera flera tillståndsvärden och definiera flera villkor för att generera poster, men alla poster som genereras bör använda samma schema.
python
I Python definierar du utdataschemat med nyckelordet outputStructType när du anropar transformWithStateInPandas.
Du genererar poster med hjälp av ett pandas DataFrame-objekt och yield.
Du kan valfritt yield en tom DataFrame. När den kombineras med utdataläge update uppdaterar en tom DataFrame värdena för grupperingsnyckeln till null.
Scala
I Scala genererar du poster med hjälp av ett Iterator objekt. Schemat för utdata härleds från avgivna poster.
Du kan välja att generera en tom Iterator. När update utdataläge kombineras och en tom Iterator sänds, uppdateras värdena för grupperingsnyckeln till att bli null.
Hantera indatarader
Använd handleInputRows-metoden för att definiera logiken för hur poster som observerats i din strömfråga interagerar med och uppdaterar tillståndsvärden. Hanteraren som du definierar med metoden handleInputRows körs varje gång alla poster bearbetas via din fråga för strukturerad direktuppspelning.
För de flesta tillståndskänsliga program som implementeras med transformWithStatedefinieras kärnlogik med hjälp av handleInputRows.
För varje mikrobatchuppdatering som bearbetas är alla poster i mikrobatchen för en viss grupperingsnyckel tillgängliga med hjälp av en iterator. Användardefinierad logik kan interagera med alla poster från den aktuella mikrobatchen och värdena i tillståndsarkivet.
Program för tidsinställda händelser
Du kan använda timers för att implementera anpassad logik baserat på förfluten tid från ett angivet villkor.
Du arbetar med tidsmätare genom att implementera en handleExpiredTimer-metod.
Inom en grupperingsnyckel identifieras timers unikt av tidsstämpeln.
När en timer upphör att gälla bestäms resultatet av logiken som implementeras i ditt program. Vanliga mönster är:
- Genererar information som lagras i en tillståndsvariabel.
- Rensa lagrad tillståndsinformation.
- Skapa en ny timer.
Utgångna timers utlöses även om inga poster för deras associerade nyckel bearbetas i en mikrobatch.
Ange tidsmodellen
När du skickar StatefulProcessor till transformWithStatemåste du ange tidsmodellen. Följande alternativ stöds:
ProcessingTimeEventTime-
NoTimeellerTimeMode.None()
Att ange NoTime innebär att timers inte stöds för processorn.
Inbyggda timervärden
Databricks rekommenderar att du inte anropar systemklockan i ditt anpassade tillståndskänsliga program, eftersom detta kan leda till otillförlitliga återförsök vid aktivitetsfel. Använd metoderna i klassen TimerValues när du måste komma åt bearbetningstiden eller vattenstämpeln:
TimerValues |
Beskrivning |
|---|---|
getCurrentProcessingTimeInMs |
Returnerar tidsstämpeln för bearbetningstiden för den aktuella batchen i millisekunder sedan epoken. |
getCurrentWatermarkInMs |
Returnerar tidsstämpeln för den aktuella batchens vattenstämpel i millisekunder sedan epok. |
Anteckning
Bearbetningstiden beskriver den tid då mikrobatchen bearbetas av Apache Spark. Många strömmande källor, till exempel Kafka, inkluderar även systembearbetningstid.
Vattenstämplar i strömmande sökfrågor definieras ofta mot evenemangstid eller bearbetningstid för den strömmande källan. Se Använd vattenstämplar för att kontrollera tröskelvärden för databehandling.
Både vattenstämplar och fönster kan användas i kombination med transformWithState. Du kan implementera liknande funktioner i ditt anpassade tillståndskänsliga program genom att använda TTL, timers och MapState eller ListState funktioner.
Vad är tillståndstid att leva (TTL)?
Tillståndsvärdena som används av transformWithState har stöd för en valfri TTL-specifikation (time to live). När TTL går ut tas värdet bort från tillståndslagringen. TTL interagerar bara med värden i tillståndsarkivet, vilket innebär att du kan implementera logik för att avlägsna tillståndsinformation, men du kan inte direkt utlösa logik när TTL tar bort tillståndsvärden.
Viktig
Om du inte implementerar TTL måste du hantera tillståndsavhysning med hjälp av annan logik för att undvika oändlig tillståndstillväxt.
TTL tillämpas för varje tillståndsvärde, med olika regler för varje tillståndstyp.
- Tillståndsvariabler är begränsade till grupperingsnycklar.
- För
ValueStateobjekt lagras endast ett enda värde per grupperingsnyckel. TTL gäller för det här värdet. - För
ListStateobjekt kan listan innehålla många värden. TTL gäller för varje värde i en lista oberoende av varandra. - För
MapStateobjekt har varje kartnyckel ett associerat tillståndsvärde. TTL gäller oberoende av varje nyckel/värde-par på en karta.
För alla tillståndstyper återställs TTL om tillståndsinformationen uppdateras.
Anteckning
Även om TTL är begränsat till enskilda värden i en ListStateär det enda sättet att uppdatera ett värde i en lista att använda metoden put för att skriva över hela innehållet i variabeln ListState.
Vad är skillnaden mellan timers och TTL?
Det finns viss överlappning mellan timers och time to live (TTL) för tillståndsvariabler, men timers ger en bredare uppsättning funktioner än TTL.
TTL avlägsnar tillståndsinformation som inte har uppdaterats under den period som användaren har angett. På så sätt kan användare förhindra omarkerad tillståndstillväxt och ta bort inaktuella tillståndsposter. Eftersom kartor och listor implementerar TTL för varje värde kan du implementera funktioner som endast tar hänsyn till tillståndsvärden som nyligen har uppdaterats genom att ange TTL.
Med timers kan du definiera anpassad logik utöver att radera tillstånd, inklusive att skicka datauppgifter. Du kan också använda timers för att rensa tillståndsinformation för ett visst tillståndsvärde, med ytterligare flexibilitet för att generera värden eller utlösa annan villkorsstyrd logik baserat på timern.