Een uitvoermodus voor gestructureerd streamen selecteren
In dit artikel wordt beschreven hoe u een uitvoermodus selecteert voor stateful streaming. Alleen stateful streams met aggregaties vereisen een configuratie van de uitvoermodus.
Joins ondersteunen alleen de toevoeguitvoermodus en de uitvoermodus heeft geen invloed op ontdubbeling. De willekeurige stateful operators mapGroupsWithState
en flatMapGroupsWithState
verzenden records met behulp van hun eigen aangepaste logica, zodat de uitvoermodus van de stream geen invloed heeft op hun gedrag.
Voor stateless streaming gedragen alle uitvoermodi zich hetzelfde.
Als u de uitvoermodus correct wilt configureren, moet u stateful streaming, watermerken en triggers begrijpen. Zie de volgende artikelen:
- Wat is stateful streaming?
- Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren
- Triggerintervallen voor gestructureerd streamen configureren
Wat is de uitvoermodus?
De uitvoermodus van een Structured Streaming-query bepaalt welke records de operators van de query tijdens elke trigger verzenden. De drie typen records die kunnen worden verzonden, zijn:
- Registreert dat toekomstige verwerking niet verandert.
- De records die zijn gewijzigd sinds de laatste trigger.
- Alle records in de statustabel.
Weten welke typen records moeten worden verzonden, is belangrijk voor stateful operators, omdat een bepaalde rij die door een stateful operator wordt geproduceerd, kan veranderen van trigger naar trigger. Als een operator voor streamingaggregatie bijvoorbeeld meer rijen voor een bepaald venster ontvangt, kunnen de aggregatiewaarden van dat venster veranderen tussen triggers.
Voor stateless operators heeft het onderscheid tussen recordtypen geen invloed op het gedrag van de operator. De records die een staatloze operator verzendt tijdens een trigger, zijn altijd de bronrecords die tijdens die trigger worden verwerkt.
Beschikbare uitvoermodi
Er zijn drie uitvoermodi waarmee een operator aangeeft welke records moeten worden verzonden tijdens een bepaalde trigger:
Uitvoermodus | Beschrijving |
---|---|
Toevoegmodus (standaard) | Streamingquery's worden standaard uitgevoerd in de toevoegmodus. In deze modus verzenden operators alleen rijen die niet veranderen in toekomstige triggers. Stateful operators gebruiken het watermerk om te bepalen wanneer dit gebeurt. |
Updatemodus | In de updatemodus verzenden operators alle rijen die tijdens de trigger zijn gewijzigd, zelfs als de verzonden record in een volgende trigger kan veranderen. |
Volledige modus | De volledige modus werkt alleen met streamingaggregaties. In de volledige modus worden alle resulterende rijen die door de operator zijn geproduceerd, downstream verzonden. |
Overwegingen voor productie
Voor veel stateful streamingbewerkingen moet u kiezen tussen toevoeg- en updatemodi. In de volgende secties worden overwegingen beschreven waarmee u uw beslissing kunt bepalen.
Notitie
De volledige modus heeft sommige toepassingen, maar kan slecht presteren wanneer gegevens worden geschaald. Databricks raadt aan om gerealiseerde weergaven te gebruiken om semantische garanties te krijgen die zijn gekoppeld aan de volledige modus met incrementele verwerking voor veel stateful bewerkingen. Zie Gerealiseerde weergaven gebruiken in Databricks SQL.
Semantiek van toepassingen
Toepassingssemantiek beschrijft hoe downstreamtoepassingen gebruikmaken van de streaminggegevens.
Als downstreamservices één actie moeten ondernemen voor elke downstream-schrijfbewerking, gebruikt u in de meeste gevallen de toevoegmodus. Als u bijvoorbeeld een downstreammeldingsservice hebt die meldingen verzendt voor elke nieuwe record die naar de sink is geschreven, zorgt de toevoegmodus ervoor dat elke record slechts eenmaal wordt geschreven. De updatemodus schrijft de record telkens wanneer de statusinformatie verandert, wat zou leiden tot talloze updates.
Als downstreamservices nieuwe resultaten nodig hebben, zorgt de updatemodus ervoor dat uw sink zo actueel mogelijk blijft. Voorbeelden zijn een machine learning-model dat functies in realtime leest of een analysedashboard waarmee realtime statistische gegevens worden bijgehouden.
Compatibiliteit van operatoren en sinks
Structured Streaming biedt geen ondersteuning voor alle bewerkingen die beschikbaar zijn in Apache Spark en sommige streamingbewerkingen worden niet ondersteund in alle uitvoermodi. Zie de OSS-streamingdocumenten voor meer informatie over operatorbeperkingen.
Niet alle sinks ondersteunen alle uitvoermodi. Beide Delta Lake, die alle beheerde tabellen van Unity Catalog ondersteunt, en Kafka ondersteunt alle uitvoermodi. Zie de OSS-streamingdocumenten voor meer informatie over sinkcompatibiliteit.
Latentie en kosten
De uitvoermodus heeft invloed op de hoeveelheid tijd die moet worden verstreken voordat u een record schrijft. De frequentie en hoeveelheid geschreven gegevens kunnen van invloed zijn op de kosten die zijn gekoppeld aan streamingpijplijnen.
In de toevoegmodus worden stateful operatoren alleen verzonden nadat stateful resultaten zijn voltooid. Dit is ten minste zolang de watermerkvertraging is vertraagd. Een watermerkvertraging in de uitvoermodus voor 1 hour
toevoegen betekent dat uw records ten minste één uur vertraging hebben voordat ze downstream worden verzonden.
De updatemodus resulteert in één schrijfbewerking per trigger per geaggregeerde waarde. Als uw sinkkosten per schrijf per record in rekening worden gebracht, kan dit duur zijn als records vaak worden bijgewerkt voordat de watermerkvertraging verloopt.
Configuratievoorbeelden
In de volgende codevoorbeelden ziet u hoe u de uitvoermodus configureert voor streaming-updates voor Unity Catalog-tabellen:
Python
# Append output mode (default)
(df.writeStream
.toTable("target_table")
)
# Append output mode (same as default behavior)
(df.writeStream
.outputMode("append")
.toTable("target_table")
)
# Update output mode
(df.writeStream
.outputMode("update")
.toTable("target_table")
)
# Complete output mode
(df.writeStream
.outputMode("complete")
.toTable("target_table")
)
Scala
// Append output mode (default)
df.writeStream
.toTable("target_table")
// Append output mode (same as default behavior)
df.writeStream
.outputMode("append")
.toTable("target_table")
// Update output mode
df.writeStream
.outputMode("update")
.toTable("target_table")
// Complete output mode
df.writeStream
.outputMode("complete")
.toTable("target_table")
Zie OSS-documenten voor PySpark DataStreamWriter.outputMode of Scala DataStreamWriter.outputMode.
Voorbeeld van stateful streaming- en uitvoermodi
Het volgende voorbeeld is bedoeld om u te helpen redeneren door de manier waarop de uitvoermodus communiceert met watermerken voor stateful streaming.
Overweeg een streamingaggregatie die de totale omzet berekent die elk uur in een winkel wordt gegenereerd met een watermerkvertraging van 15 minuten. De eerste microbatch verwerkt de volgende records:
- $ 15 om 2:40 uur
- $ 10 om 2:30 uur
- $ 30 om 13:10 uur
Op dit moment is het watermerk van de motor 2:55 uur, omdat het 15 minuten (de vertraging) aftrekken van de maximale tijd die wordt gezien (17:10). De operator voor streamingaggregatie heeft de volgende status:
[2pm, 3pm]
: $ 25[3pm, 4pm]
: $ 30
In de volgende tabel ziet u wat er in elke uitvoermodus zou gebeuren:
Uitvoermodus | Resultaat en reden |
---|---|
Toevoegen | De operator voor streamingaggregatie verzendt niets downstream. Dit komt doordat beide vensters kunnen veranderen naarmate er nieuwe waarden worden weergegeven met een volgende trigger: het watermerk van 2:55 uur geeft aan dat records na 2:55 uur mogelijk nog steeds binnenkomen en deze records kunnen vallen in het [2pm, 3pm] venster of het [3pm, 4pm] venster. |
Bijwerken | De operator verzendt beide records, omdat beide records updates hebben ontvangen. |
Voltooid | De operator verzendt alle records. |
Stel nu dat de stream nog een record ontvangt:
- $ 20 om 13:20 uur
Het watermerk wordt bijgewerkt naar 15:05 uur omdat de engine 15 minuten van 15:20 uur aftrekken. Op dit moment heeft de operator voor streamingaggregatie het volgende in de status:
[2pm, 3pm]
: $ 25[3pm, 4pm]
: $ 50
In de volgende tabel ziet u wat er in elke uitvoermodus zou gebeuren:
Uitvoermodus | Resultaat en reden |
---|---|
Toevoegen | De operator voor streamingaggregatie ziet dat het watermerk van 15:05 uur groter is dan het einde van het [2pm, 3pm] venster. Door de definitie van het watermerk kan dat venster niet meer worden gewijzigd, zodat het venster wordt [2pm, 3pm] verzonden. |
Bijwerken | De operator voor streamingaggregatie verzendt het [3pm, 4pm] venster omdat de statuswaarde is gewijzigd van $ 30 in $ 50. |
Voltooid | De operator verzendt alle records. |
Hieronder ziet u hoe stateful operators zich gedragen in elke toevoegmodus:
- Schrijf records eenmaal na de watermerkvertraging in de toevoegmodus.
- Schrijf in de updatemodus records die zijn gewijzigd sinds de vorige trigger.
- Schrijf in de volledige modus alle records die zijn geproduceerd door de stateful operator.