Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här sidan beskriver begreppen bakom vattenstämpling och innehåller rekommendationer om hur du använder vattenstämplar i vanliga tillståndsbaserade strömningsoperationer.
Strömmande frågor ackumulerar tillståndsdata över tid. Vattenstämplar tar automatiskt bort gamla tillståndsdata för att förhindra minnesfel och ökad bearbetningsfördröjning.
Vad är en vattenstämpel?
Under bearbetningen behåller Structured Streaming tillstånd mellan mikrobatcherna. Strömmande frågor använder tillstånd för att stegvis uppdatera resultat i stället för att omberäkna allt efter varje mikrobatch. Vattenmärken avgör tröskelvärdet för när en fråga slutar att bearbeta en tillståndsenhet.
Vanliga exempel på tillståndsentiteter är:
- Sammansättningar över ett tidsfönster.
- Unika nycklar i en koppling mellan två strömmar.
Om du vill deklarera en vattenstämpel på en strömmande DataFrame anger du ett tidsstämpelfält och ett tröskelvärde för fördröjning. När nya data tas emot spårar tillståndshanteraren den senaste tidsstämpeln i det angivna fältet och bearbetar endast poster inom tröskelvärdet för fördröjning.
Frågor bearbetar alltid poster som inkommer inom tröskelvärdet. Frågor kan fortfarande bearbeta poster som kommer utanför tröskelvärdet, men detta är inte garanterat.
I följande exempel tillämpas ett tröskelvärde på 10 minuter för ett antal fönster:
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
I det här exemplet:
- Kolumnen
event_timeanvänds för att definiera en vattenstämpel på 10 minuter och ett rullande fönster på 5 minuter. - Ett antal registreras för varje
idsom observeras under varje 5-minutersintervall som inte överlappar. - Tillståndsinformation upprätthålls för varje räkning tills fönstrets slut ligger 10 minuter före den senast observerade
event_time.
Viktigt!
I en åtgärd av typen groupBy() och window() refererar du till kolumner med namn, "<colName>" eller col("<colName>"), så att händelsetidsmarkören bevaras. I Scala kan du också använda $colName.
Hur påverkar vattenstämplar bearbetningstiden och dataflödet?
Utdatalägen styr när en fråga med vattenstämplar skriver data till mottagaren. Vattenstämplar är viktiga för dataflödeskontroll i tillståndskänslig strömning eftersom de minskar den totala mängden tillståndsinformation i minnet. Alla utdatalägen stöds inte för alla tillståndskänsliga åtgärder. Se Vattenstämplar och utdataläge för fönsteraggregeringar.
Det finns kompromisser när du väljer en vattenstämpelvaraktighet:
- Kortare vattenstämplar ger kortare frågesvarstid eftersom frågor lagrar mindre tillståndsinformation och skriver resultat när varje vattenstämpelvaraktighet har slutförts. Korta vattenstämplar har dock låg tolerans för sena data.
- Längre vattenmärken har hög tolerans för sent inkommande data. Långa vattenstämplar ökar dock frågefördröjningen eftersom frågor måste lagra mer tillståndsinformation och vänta med att skriva resultat efter en längre vattenstämpelvaraktighet.
Vattenstämplar och utgångsläge för fönsteraggregeringar
I följande tabell visas bearbetningsbeteende för frågor med aggregering på en tidsstämpel och en vattenstämpel:
| Utdataläge | Funktionssätt |
|---|---|
| Lägga till | Frågan skriver rader till måltabellen när vattenstämpeltröskeln har passerat. Alla skrivningar fördröjs baserat på tröskelvärdet för fördröjning. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat. |
| Update | Frågan skriver rader till måltabellen när resultaten beräknas, och frågan kan uppdatera och skriva över rader när nya data tas emot. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat. |
| Klart | Aggregatstatusen tas inte bort. Frågan skriver om måltabellen för varje utlösare. |
Vattenstämplar och utdatalägen för strömströmanslutningar
Kopplingar mellan flera strömmar stöder endast tilläggsläge. Frågor skriver matchade poster för varje batch.
För inre kopplingar rekommenderar Databricks att du anger ett tröskelvärde för vattenstämpel för varje strömmande datakälla så att frågan kan ta bort tillståndsinformation för gamla poster. Utan vattenstämplar försöker Structured Streaming ansluta varje nyckel från båda sidor av kopplingen på varje utlösare, vilket kan påverka prestanda.
För yttre kopplingar är vattenstämpling obligatoriskt. När en post inte matchar skriver frågan ett null-värde för nyckeln. Eftersom kopplingar endast stöder tilläggsläge skrivs inte omatchade poster förrän tröskelvärdet för fördröjning har passerat.
Styr tröskelvärdet för sena data med en policy med flera watermarks
För flera indata till Structured Streaming kan du ange flera vattenmärken för att styra toleransgränserna för för sent inkomna data. Med vattenstämplar kan du styra tillståndsinformation och svarstid.
En direktuppspelningsfråga kan ha flera indataströmmar som är sammankopplade eller kopplade. För tillståndskänsliga åtgärder kan var och en av indataströmmarna kräva ett annat tröskelvärde för sen datatolerans. Ange dessa tröskelvärden med withWatermark("eventTime", delay) på varje indataström. Följande är en exempelfråga med stream-stream-kopplingar.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
När du kör frågan med tillståndskänsliga åtgärder spårar Structured Streaming individuellt den maximala händelsetiden för varje indataström, beräknar vattenstämplar baserat på motsvarande fördröjning och fastställer en enda global vattenstämpel. Som standard använder Structured Streaming minimivärdet som global vattenstämpel. Om en ström släpar efter de andra förhindrar en global minimivattenstämpel att frågan oavsiktligt klassificerar data som för sena. Detta kan till exempel inträffa när en av strömmarna slutar ta emot data på grund av uppströmsfel. Den globala vattenstämpeln rör sig säkert i den långsammaste strömmens takt och fördröjer frågeutdata vid behov.
För att minska svarstiden anger du spark.sql.streaming.multipleWatermarkPolicy till max (standard är min) för att använda den snabbaste strömmens vattenstämpel som global vattenstämpel. Den här konfigurationen släpper dock data från de långsammaste strömmarna. Databricks rekommenderar att du tillämpar den här konfigurationen med försiktighet.
Tillämpa vattenstämplar på distinkta åtgärder
Operationen distinct håller reda på varje unik post i tillståndet. Utan watermark växer tillståndet utan gräns och kan orsaka minnesproblem. Ange ett vattenmärke för ett tidsstämpelfält för att begränsa tillståndet och ta bort gamla poster när tröskelvärdet har passerats.
I följande exempel tillämpas en vattenstämpel på en distinct åtgärd:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
I det här exemplet tar strömningsfrågan bort dubblettposter som anländer inom 1 timme från den senast observerade eventTime. Frågan tar bort tillståndsinformation för deduplicering när tröskelvärdet har överskridits.
Viktigt!
Om du vill deduplicera specifika kolumner i stället för alla kolumner använder du dropDuplicates() eller dropDuplicatesWithinWatermark() i stället för distinct. Se Ta bort dubletter inom vattenstämpelområdet.
Ta bort dubbletter inom vattenstämpeln
I Databricks Runtime 13.3 LTS eller senare kan du använda en unik identifierare för att deduplicera poster inom ett tröskelvärde för vattenstämpel.
Strukturerad direktuppspelning garanterar bearbetning exakt en gång men deduplicerar inte poster från datakällor. Använd dropDuplicatesWithinWatermark för att ta bort dubbletter i alla fält, även om fälten skiljer sig åt mellan dubblettposter, till exempel händelsetid eller ankomsttid.
Med dropDuplicatesWithinWatermark deduplicerar frågor alltid poster som inkommer inom watermark-gränsen. Frågor kan också deduplicera poster som tas emot utanför tröskelvärdet, men detta är inte garanterat. För att garantera att frågor tar bort alla dubbletter anger du vattenstämpelns tröskelvärde till ett värde som är större än den maximala tidsstämpelskillnaden mellan dubbletthändelser.
Du måste ange en vattenstämpel för att använda dropDuplicatesWithinWatermark metoden:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Exempel på användningsfall
I följande exempel visas avancerade användningsfall för fönster:
Använda rullande fönster för att beräkna försäljningssummor per timme
Tumlande fönster har fast storlek och överlappar inte. Varje indatarad tillhör exakt ett fönster. Använd rullande fönster för att beräkna diskreta aggregeringar för tidsperioder, till exempel försäljningssummor per timme:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
I det här exemplet:
-
window("timestamp", "1 hour")grupperar beställningar i icke-överlappande 1-timmarsintervall, till exempel 5 till 6 AM och 6 till 7 AM. -
withWatermark("timestamp", "1 hour")behåller varje fönsters aggregering i minnet tills fönstrets sluttidsstämpel är 1 timme äldre än den senaste ordertidsstämpeln.
Använd glidande fönster för att beräkna rullande aggregat
Glidande fönster har fast storlek med intervall som kan överlappa varandra. En enskild rad kan tillhöra flera fönster. Använd glidande fönster för att beräkna rullande aggregat, till exempel försäljning under en rullande sextimmarsperiod:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
I det här exemplet:
-
window("timestamp", "6 hours", slideDuration="1 hour")grupperar beställningar i 6-timmarsintervall som avancerar med 1 timme, till exempel 5 till 11 AM och 6 AM till 12 PM. -
withWatermark("timestamp", "1 hour")behåller varje fönsters aggregat i tillståndet tills fönstrets sluttidsstämpel är 1 timme äldre än den maximala ordertidsstämpeln. -
slideDurationmåste vara mindre än eller lika medwindowDuration.
Använda sessionsfönster för att kontrollera användaraktivitet
Sessionsfönster har ingen fast storlek. Ett fönster öppnas när en rad kommer och stängs efter en mellanrumsvaraktighet som inte innehåller några nya rader. Använd sessionsfönster för att aggregera aktivitetstoppar mellan långa inaktiva perioder, till exempel en användares sidvyer inom en 30-minuters period:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
I det här exemplet:
-
session_window("timestamp", gapDuration="30 minutes")öppnar ett fönster när den första sidvisningen registreras. Varje efterföljande sidvisning som inkommer inom 30 minuter förlänger tidsfönstret. När ingen sidvy kommer inom 30 minuter stängs fönstret och nästa sidvy startar ett nytt fönster. -
withWatermark("timestamp", "1 hour")håller varje sessions aggregering i tillstånd tills tidsstämpeln för fönstrets slut är 1 timme äldre än den maximala tidsstämpeln för sidvisning. - Argumentet
timeColumnförwindow()ochsession_window()måste vara avTimestampTypeellerTimestampNTZType. - Använd
current_timestamp()för att definiera fönster baserat på bearbetningstid i stället för händelsetid. - Du kan ange fönstervaraktighet från mikrosekunder upp till dagar. Månadsvaraktighet och längre stöds inte.
- Använd
completeutmatningsläge med fönsteraggregeringar för att bevara allt fönstertillstånd på obestämd tid. Användappendutmatningsläge med en lämplig vattenstämpel för att begränsa tillståndstillväxten och förhindra minnesproblem för stora datamängder. Mer information om hur utdataläget fungerar finns i Vattenmärken och utdataläge för fönsteraggregeringar.