Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Belangrijk
Deze functie bevindt zich in Openbare Preview in Databricks Runtime 16.2 en hoger.
U kunt streamingtoepassingen bouwen met behulp van aangepaste stateful operators om lage latentie en bijna realtime oplossingen te implementeren die gebruikmaken van willekeurige stateful logica. Aangepaste operators met status ontketenen nieuwe operationele toepassingen en patronen die niet beschikbaar zijn via traditionele verwerking van gestructureerde streaming.
Notitie
Databricks raadt aan om ingebouwde Structured Streaming-functionaliteit te gebruiken voor ondersteunde stateful bewerkingen, zoals aggregaties, ontdubbeling en streaming joins. Zie Wat is 'stateful streaming'?.
Databricks raadt aan transformWithState te gebruiken voor verouderde operators voor willekeurige statustransformaties. Raadpleeg flatMapGroupsWithStatevoor documentatie over de verouderde mapGroupsWithState- en -operators.
Eisen
De transformWithState-operator en de bijbehorende API's en klassen hebben de volgende vereisten:
- Beschikbaar in Databricks Runtime 16.2 en hoger.
- Compute moet gebruikmaken van de toegewezen of niet-isolatietoegangsmodus, behalve de standaardtoegangsmodus wordt ondersteund voor Python (
transformWithStateInPandas) in Databricks Runtime 16.3 en hoger, en voor Scala (transformWithState) in Databricks Runtime 17.3 en hoger. - U moet de RocksDB state store-provider gebruiken. Databricks raadt aan om RocksDB in te schakelen als onderdeel van de rekenconfiguratie.
Notitie
Voer het volgende uit om de provider van de rocksDB-statusopslag voor de huidige sessie in te schakelen:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Wat is transformWithState?
De operator transformWithState past een aangepaste stateful processor toe op een Structured Streaming-query. U moet een aangepaste stateful processor implementeren om transformWithStatete gebruiken. Structured Streaming bevat API's voor het bouwen van uw stateful processor met behulp van Python, Scala of Java.
U gebruikt transformWithState om aangepaste logica toe te passen op een groeperingssleutel voor records die incrementeel worden verwerkt met Structured Streaming. Hieronder wordt het ontwerp op hoog niveau beschreven:
- Definieer een of meer statusvariabelen.
- Statusinformatie wordt bewaard voor elke groeperingssleutel en kan worden geopend voor elke statusvariabele volgens door de gebruiker gedefinieerde logica.
- Voor elke verwerkte micro-batch zijn alle records voor de specifieke sleutel beschikbaar als iterator.
- Gebruik ingebouwde ingangen om te bepalen wanneer en hoe records worden verzonden op basis van timers en door de gebruiker gedefinieerde voorwaarden.
- Statuswaarden ondersteunen afzonderlijke TTL-definities (Time-to-Live), waardoor de statusverlooptijd en statusgrootte kunnen worden beheerd.
Omdat transformWithState ondersteuning biedt voor de ontwikkeling van schema's in het statusarchief, kunt u uw productietoepassingen herhalen en bijwerken zonder historische statusgegevens te verliezen of records opnieuw te moeten verwerken, waardoor u flexibiliteit hebt voor ontwikkeling en onderhoud. Bekijk de ontwikkeling van schema's in het statusarchief.
Belangrijk
PySpark gebruikt de operator transformWithStateInPandas in plaats van transformWithState. Azure Databricks-documentatie maakt gebruik van transformWithState om functionaliteit te beschrijven voor zowel Python- als Scala-implementaties.
De Scala- en Python-implementaties van transformWithState en gerelateerde API's verschillen vanwege taalspecifieke gegevens, maar bieden dezelfde functionaliteit. Raadpleeg taalspecifieke voorbeelden en API-documentatie voor de programmeertaal van uw voorkeur.
Ingebouwde verwerkingsgrepen
U implementeert de kernlogica voor uw aangepaste stateful toepassing door handlers te implementeren met en gebruik te maken van ingebouwde handles.
- Handles bieden de methoden om te communiceren met statuswaarden en timers, binnenkomende records verwerken en records verzenden.
- Handlers definiëren uw aangepaste gebeurtenisgestuurde logica.
Ingangen voor elk statustype worden geïmplementeerd op basis van de onderliggende gegevensstructuur, maar elk bevat functionaliteit voor het ophalen, plaatsen, bijwerken en verwijderen van records.
Handlers worden geïmplementeerd op basis van ofwel gebeurtenissen die worden waargenomen in invoerrecords of timers, met gebruikmaking van de volgende semantiek:
- Definieer een handler met behulp van de
handleInputRowsmethode om te bepalen hoe gegevens worden verwerkt, de status wordt bijgewerkt en records worden verzonden voor elke microbatch met records die voor de groeperingssleutel worden verwerkt. Zie Invoerrijenverwerken. - Definieer een handler met behulp van de methode
handleExpiredTimerom drempelwaarden op basis van tijd te gebruiken om logica uit te voeren, ongeacht of er aanvullende records worden verwerkt voor de groeperingssleutel. Zie geplande programma-evenementen .
De volgende tabel bevat een vergelijking van functionele gedragingen die worden ondersteund door deze handlers:
| Gedrag | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Toestandswaarden ophalen, plaatsen, bijwerken of wissen | Ja | Ja |
| Een timer maken of verwijderen | Ja | Ja |
| Records uitsturen | Ja | Ja |
| Itereer over records in de huidige micro-batch | Ja | Nee |
| Triggerlogica op basis van verstreken tijd | Nee | Ja |
U kunt handleInputRows en handleExpiredTimer combineren om zo nodig complexe logica te implementeren.
U kunt bijvoorbeeld een toepassing implementeren die gebruikmaakt van handleInputRows om statuswaarden voor elke microbatch bij te werken en in de toekomst een timer van 10 seconden in te stellen. Als er geen extra records worden verwerkt, kunt u handleExpiredTimer gebruiken om de huidige waarden in het statusarchief te verzenden. Als nieuwe records worden verwerkt voor de groeperingssleutel, kunt u de bestaande timer wissen en een nieuwe timer instellen.
aangepaste statustypen
U kunt meerdere statusobjecten implementeren in één stateful operator. De namen die u aan elk statusobject geeft, blijven behouden in het statusarchief, waartoe u toegang hebt met de lezer van het statusarchief. Als uw statusobject een StructTypegebruikt, geeft u namen op voor elk veld in de struct tijdens het doorgeven van het schema. Deze namen zijn ook zichtbaar bij het lezen van de statusopslag. Zie Lees informatie over de status van Gestructureerd Streamen.
De functionaliteit van ingebouwde klassen en operators is bedoeld om flexibiliteit en uitbreidbaarheid te bieden, en implementatieopties moeten worden geïnformeerd door de volledige logica die uw toepassing moet uitvoeren. U kunt bijvoorbeeld bijna identieke logica implementeren met behulp van een ValueState gegroepeerd op velden user_id en session_id of een MapState gegroepeerd op user_id waarbij session_id de sleutel is voor de MapState. In dit geval kan een MapState de voorkeurs implementatie zijn als logica voorwaarden moet evalueren voor meerdere session_ids.
In de volgende secties worden de statustypen beschreven die worden ondersteund door transformWithState.
ValueState
Voor elke groeperingssleutel is er een gekoppelde waarde.
Een waardestatus kan complexe typen bevatten, zoals een struct of tuple. Wanneer u een ValueStatebijwerkt, implementeert u logica om de volledige waarde te vervangen. De TTL voor een waardestatus wordt opnieuw ingesteld wanneer de waarde wordt bijgewerkt, maar niet opnieuw wordt ingesteld als een bronsleutel die overeenkomt met een ValueState wordt verwerkt zonder de opgeslagen ValueStatebij te werken.
ListState
Voor elke groeperingssleutel is er een gekoppelde lijst.
Een lijststatus is een verzameling waarden, die elk complexe typen kunnen bevatten. Elke waarde in een lijst heeft een eigen TTL. U kunt items toevoegen aan een lijst door afzonderlijke items toe te voegen, een lijst met items toe te voegen of de hele lijst te overschrijven met een put. Alleen de putbewerking wordt beschouwd als een update voor het opnieuw instellen van TTL.
MapState
Voor elke groeperingssleutel is er een gekoppelde kaart. Kaarten zijn het functionele Apache Spark-equivalent aan een Python-dict.
Belangrijk
Met groeperingssleutels worden de velden beschreven die zijn opgegeven in de GROUP BY-component van de Structured Streaming-query. Kaarttoestanden bevatten een willekeurig aantal sleutel-waarde paren voor een groeperingssleutel.
Als u bijvoorbeeld groepeert op user_id en een kaart wilt definiëren voor elke session_id, is de groeperingssleutel user_id en wordt de sleutel in de kaart session_id.
Een kaartstatus is een verzameling afzonderlijke sleutels die elk aan een waarde worden gekoppeld, welke complexe typen kan bevatten. Elk sleutel-waardepaar in een kaart heeft een eigen TTL. U kunt de waarde van een specifieke sleutel bijwerken of een sleutel en de bijbehorende waarde verwijderen. U kunt een afzonderlijke waarde retourneren met behulp van de sleutel, alle sleutels weergeven, alle waarden weergeven of een iterator retourneren om te werken met de volledige set sleutel-waardeparen in de kaart.
Een aangepaste statusvariabele initialiseren
Wanneer u uw StatefulProcessorinitialiseert, maakt u een lokale variabele voor elk statusobject waarmee u kunt communiceren met statusobjecten in uw aangepaste logica. Statusvariabelen worden gedefinieerd en geïnitialiseerd door de ingebouwde init methode in de klasse StatefulProcessor te overschrijven.
U definieert een willekeurige hoeveelheid statusobjecten met behulp van de getValueState, getListStateen getMapState methoden tijdens het initialiseren van uw StatefulProcessor.
Elk statusobject moet het volgende hebben:
- Een unieke naam
- Een opgegeven schema
- In Python wordt het schema expliciet opgegeven.
- Geef in Scala een
Encoderdoor om het statusschema op te geven.
U kunt ook een optionele TTL-duur (time-to-live) opgeven in milliseconden. Als u een kaartstatus implementeert, moet u een afzonderlijke schemadefinitie opgeven voor de kaartsleutels en de waarden.
Notitie
Logica voor de manier waarop statusinformatie wordt opgevraagd, bijgewerkt en verzonden, wordt afzonderlijk verwerkt. Zie Gebruik uw statusvariabelen.
Voorbeeld van een "stateful" toepassing
Hieronder ziet u de basissyntaxis voor het definiëren en gebruiken van een aangepaste stateful processor met transformWithState, inclusief voorbeeldstatusvariabelen voor elk ondersteund type. Zie Voorbeeld van stateful toepassingenvoor meer voorbeelden.
Notitie
Python maakt gebruik van tuples voor alle interacties met statuswaarden. Dit betekent dat Python-code waarden moet doorgeven met behulp van tuples bij het gebruik van bewerkingen zoals put en update en tuples moet verwerken wanneer getwordt gebruikt.
Als het schema voor uw waardestatus bijvoorbeeld slechts één geheel getal is, implementeert u code als volgt:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Dit geldt ook voor items in een ListState of waarden in een MapState.
Python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
PySpark bevat de StatefulProcessorHandle-klasse om toegang te bieden tot functies die bepalen hoe uw door de gebruiker gedefinieerde Python-code communiceert met statusinformatie. U moet de StatefulProcessorHandle altijd importeren en doorgeven aan de variabele handle bij het initialiseren van een StatefulProcessor.
De handle variabele koppelt de lokale variabele in uw Python-klasse aan de statusvariabele.
Notitie
Scala maakt gebruik van de methode getHandle.
Beginstatus opgeven
U kunt desgewenst een initiële status opgeven voor gebruik met de eerste microbatch. Dit kan handig zijn wanneer u een bestaande werkstroom migreert naar een nieuwe aangepaste toepassing, een stateful operator bijwerkt om uw schema of logica te wijzigen of een fout herstelt die niet automatisch kan worden hersteld en handmatige tussenkomst vereist.
Notitie
Gebruik de lezer van het statusarchief om statusgegevens van een bestaand controlepunt op te vragen. Zie Lees informatie over de status van Gestructureerd Streamen.
Als u een bestaande Delta-tabel converteert naar een stateful applicatie, gebruik dan spark.read.table("table_name") om de tabel te lezen en geef het resulterende DataFrame door. U kunt desgewenst velden selecteren of wijzigen om aan te passen aan uw nieuwe statusgevoelige toepassing.
U geeft een initiële status op met behulp van een DataFrame met hetzelfde groeperingssleutelschema als de invoerrijen.
Notitie
Python gebruikt handleInitialState om de initiële status op te geven tijdens het definiëren van een StatefulProcessor. Scala maakt gebruik van de afzonderlijke klasse StatefulProcessorWithInitialState.
Uw statusvariabelen gebruiken
Ondersteunde statusobjecten bieden methoden voor het ophalen van status, het bijwerken van bestaande statusgegevens of het wissen van de huidige status. Elk ondersteund statustype heeft een unieke implementatie van methoden die overeenkomen met de geïmplementeerde gegevensstructuur.
Elke geobserveerde groeperingssleutel heeft specifieke statusinformatie.
- Records worden verzonden op basis van de logica die u implementeert en het uitvoerschema gebruikt dat u opgeeft. Zie Records uitgeven.
- U kunt waarden in het statusarchief openen met behulp van de
statestore-lezer. Deze lezer heeft batchfunctionaliteit en is niet bedoeld voor workloads met lage latentie. Zie Lees informatie over de status van Gestructureerd Streamen. - Logica die is opgegeven met
handleInputRowswordt alleen geactiveerd als records voor de sleutel aanwezig zijn in een micro-batch. Zie Invoerrijenverwerken. - Gebruik
handleExpiredTimerom tijdgebaseerde logica te implementeren die niet afhankelijk is van het observeren van records om te activeren. Zie geplande programma-evenementen .
Notitie
Statusobjecten worden geïsoleerd door sleutels te groeperen met de volgende implicaties:
- Statuswaarden kunnen niet worden beïnvloed door records die zijn gekoppeld aan een andere groeperingssleutel.
- Je kan geen logica implementeren die afhankelijk is van het vergelijken van waarden of het bijwerken van de status tussen groeperingssleutels.
U kunt waarden in een groeperingssleutel vergelijken. Gebruik een MapState om logica te implementeren met een tweede sleutel die uw aangepaste logica kan gebruiken. Als u bijvoorbeeld groepeert op user_id en uw MapState sleutelt met behulp van ip-adres, kunt u logica implementeren waarmee gelijktijdige gebruikerssessies worden bijgehouden.
Geavanceerde overwegingen voor het omgaan met status
Schrijven naar een statusvariabele activeert een schrijfbewerking naar RocksDB. Voor geoptimaliseerde prestaties raadt Databricks aan om alle waarden in de iterator voor een bepaalde sleutel te verwerken en waar mogelijk updates door te voeren in één schrijfbewerking.
Notitie
Statusupdates zijn fouttolerant. Als een taak vastloopt voordat een microbatch is verwerkt, wordt de waarde van de laatste geslaagde microbatch gebruikt bij het opnieuw proberen.
Statuswaarden hebben geen ingebouwde standaardwaarden. Als voor uw logica bestaande statusgegevens moeten worden gelezen, gebruikt u de exists methode tijdens het implementeren van uw logica.
Notitie
MapState variabelen aanvullende functionaliteit hebben om te controleren op afzonderlijke sleutels of om alle sleutels weer te geven om logica voor null-status te implementeren.
Records uitsturen
Door de gebruiker gedefinieerde logica bepaalt hoe transformWithState records verzendt. Records worden per groeperingssleutel verzonden.
Aangepaste stateful toepassingen maken geen veronderstellingen over hoe statusgegevens worden gebruikt bij het bepalen hoe records moeten worden verzonden en het geretourneerde aantal records voor een bepaalde voorwaarde kan geen, één of veel zijn.
U implementeert logica voor het verzenden van records met behulp van handleInputRows of handleExpiredTimer. Zie Omgaan met invoerrijen en verwerk programma getimede gebeurtenissen .
Notitie
U kunt meerdere statuswaarden implementeren en meerdere voorwaarden definiëren voor het verzenden van records, maar alle verzonden records moeten hetzelfde schema gebruiken.
Python
In Python definieert u uw uitvoerschema met behulp van het trefwoord outputStructType tijdens het aanroepen van transformWithStateInPandas.
U verzendt records met behulp van een Pandas DataFrame-object en yield.
U kunt desgewenst yield een leeg DataFrame gebruiken. Wanneer u in combinatie met update uitvoermodus een leeg DataFrame verzendt, worden de waarden voor de groeperingssleutel bijgewerkt om null te zijn.
Scala
In Scala verzendt u records met behulp van een Iterator-object. Het schema van de uitvoer is afgeleid van verzonden records.
U kunt desgewenst een lege Iteratorverzenden. Wanneer gecombineerd met update-uitvoermodus, zorgt het uitzenden van een lege Iterator ervoor dat de waarden voor de groeperingssleutel null worden.
Invoerrijen verwerken
Gebruik de methode handleInputRows om de logica te definiëren voor de manier waarop records die in uw streamingquery worden waargenomen, communiceren met en statuswaarden bijwerken. De handler die u definieert met de methode handleInputRows wordt uitgevoerd telkens wanneer records worden verwerkt via uw Structured Streaming-query.
Voor de meeste stateful toepassingen die zijn geïmplementeerd met transformWithState, wordt de kernlogica gedefinieerd met behulp van handleInputRows.
Voor elke verwerkte microbatch-update zijn alle records in de microbatch voor een bepaalde groeperingssleutel beschikbaar met behulp van een iterator. Door de gebruiker gedefinieerde logica kan communiceren met alle records uit de huidige microbatch en waarden in de statestore.
Geprogrammeerde tijdgebeurtenissen
U kunt timers gebruiken om aangepaste logica te implementeren op basis van verstreken tijd uit een opgegeven voorwaarde.
U werkt met timers door een handleExpiredTimer methode te implementeren.
Binnen een groeperingssleutel worden timers uniek geïdentificeerd door hun tijdstempel.
Wanneer een timer verloopt, wordt het resultaat bepaald door de logica die in uw toepassing is geïmplementeerd. Veelvoorkomende patronen zijn:
- Informatie verzenden die is opgeslagen in een statusvariabele.
- Opgeslagen statusgegevens verwijderen.
- Een nieuwe timer maken.
Verlopen timers worden afgevuurd, zelfs als er in een micro-batch geen records voor de bijbehorende sleutel worden verwerkt.
Het tijdmodel opgeven
Wanneer u uw StatefulProcessor doorgeeft aan transformWithState, moet u het tijdmodel opgeven. De volgende opties worden ondersteund:
ProcessingTimeEventTime-
NoTimeofTimeMode.None()
Het opgeven van NoTime betekent dat timers niet worden ondersteund voor uw processor.
Ingebouwde timerwaarden
Databricks raadt af om de systeemklok in uw aangepaste stateful toepassing aan te roepen, omdat dit kan leiden tot onbetrouwbare herhalingen bij het mislukken van een taak. Gebruik de methoden in de klasse TimerValues wanneer u toegang moet hebben tot de verwerkingstijd of het watermerk:
TimerValues |
Beschrijving |
|---|---|
getCurrentProcessingTimeInMs |
Retourneert de tijdstempel van de verwerkingstijd voor de huidige batch in milliseconden sinds epoch. |
getCurrentWatermarkInMs |
Retourneert de tijdstempel van het watermerk voor de huidige batch in milliseconden sinds epoch. |
Notitie
De verwerkingstijd beschrijft de tijd die de microbatch door Apache Spark wordt verwerkt. Veel streamingbronnen, zoals Kafka, bevatten ook systeemverwerkingstijd.
Watermerken voor streamingquery's worden vaak gedefinieerd op basis van gebeurtenistijd of de verwerkingstijd van de streamingbron. Zie Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren.
Zowel watermerken als vensters kunnen worden gebruikt in combinatie met transformWithState. U kunt vergelijkbare functionaliteit implementeren in uw aangepaste stateful toepassing door gebruik te maken van TTL, timers en MapState of ListState functionaliteit.
Wat is state time to live (TTL)?
De statuswaarden die worden gebruikt door transformWithState ondersteunen elk een optionele TTL-specificatie (Time to Live). Wanneer TTL verloopt, wordt de waarde verwijderd uit het statusarchief. TTL communiceert alleen met waarden in het statusarchief, wat betekent dat u logica kunt implementeren om statusgegevens te verwijderen, maar u kunt logica niet rechtstreeks activeren als TTL statuswaarden verwijdert.
Belangrijk
Als u TTL niet implementeert, moet u statusverwijdering afhandelen met behulp van andere logica om eindeloze groei van staten te voorkomen.
TTL wordt afgedwongen voor elke statuswaarde, met verschillende regels voor elk statustype.
- Statusvariabelen zijn gericht op het groeperen van sleutels.
- Voor
ValueStateobjecten wordt slechts één waarde per groeperingssleutel opgeslagen. TTL is van toepassing op deze waarde. - Voor
ListStateobjecten kan de lijst veel waarden bevatten. TTL is onafhankelijk van toepassing op elke waarde in een lijst. - Voor
MapStateobjecten heeft elke kaartsleutel een bijbehorende statuswaarde. TTL is onafhankelijk van toepassing op elk sleutel-waardepaar in een kaart.
Voor alle statustypen wordt TTL opnieuw ingesteld als de statusgegevens worden bijgewerkt.
Notitie
Hoewel TTL is gericht op afzonderlijke waarden in een ListState, is de enige manier om een waarde in een lijst bij te werken de put methode te gebruiken om de volledige inhoud van de ListState variabele te overschrijven.
Wat is het verschil tussen timers en TTL?
Er is sprake van enige overlapping tussen timers en time to live (TTL) voor statusvariabelen, maar timers bieden een bredere set functies dan TTL.
TTL verwijdert statusinformatie die niet is bijgewerkt voor de periode die door de gebruiker is opgegeven. Hierdoor kunnen gebruikers niet-gecontroleerd statusgroei voorkomen en verouderde statusvermeldingen verwijderen. Omdat mappings en lijsten TTL implementeren voor elke waarde, kunt u functies implementeren die alleen rekening houden met toestandwaarden die onlangs zijn bijgewerkt door de TTL in te stellen.
Met timers kunt u aangepaste logica definiëren buiten het verwijderen van statussen, inclusief het verzenden van records. U kunt optioneel timers gebruiken om statusinformatie voor een bepaalde statuswaarde te wissen, met de extra flexibiliteit om waarden te verzenden of andere voorwaardelijke logica te activeren op basis van de timer.