Tillämpa vattenstämplar för att kontrollera tröskelvärden för databehandling

Den här artikeln beskriver de grundläggande begreppen vattenstämpel och ger rekommendationer för att använda vattenstämplar i vanliga tillståndskänsliga strömningsåtgärder. Du måste använda vattenstämplar för tillståndskänsliga strömningsåtgärder för att undvika att oändligt utöka mängden data som lagras i tillståndet, vilket kan medföra minnesproblem och öka bearbetningens svarstider under långvariga strömningsåtgärder.

Vad är en vattenstämpel?

Strukturerad direktuppspelning använder vattenstämplar för att kontrollera tröskelvärdet för hur länge uppdateringar för en viss tillståndsentitet ska bearbetas. Vanliga exempel på tillståndsentiteter är:

  • Sammansättningar över ett tidsfönster.
  • Unika nycklar i en koppling mellan två strömmar.

När du deklarerar en vattenstämpel anger du ett tidsstämpelfält och ett tröskelvärde för vattenstämpel på en strömmande DataFrame. När nya data tas emot spårar tillståndshanteraren den senaste tidsstämpeln i det angivna fältet och bearbetar alla poster inom tröskelvärdet för fördröjning.

I följande exempel tillämpas ett tröskelvärde på 10 minuter för ett antal fönster:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

I det här exemplet:

  • Kolumnen event_time används för att definiera en vattenstämpel på 10 minuter och ett rullande fönster på 5 minuter.
  • Ett antal samlas in för varje id observerad för varje icke-överlappande 5-minutersfönster.
  • Tillståndsinformation underhålls för varje antal tills slutet av fönstret är 10 minuter äldre än den senaste observerade event_time.

Viktigt!

Tröskelvärden för vattenstämpel garanterar att poster som kommer inom det angivna tröskelvärdet bearbetas enligt semantiken för den definierade frågan. För sent angivna poster som kommer utanför det angivna tröskelvärdet kan fortfarande bearbetas med hjälp av frågemått, men detta är inte garanterat.

Hur påverkar vattenstämplar bearbetningstiden och dataflödet?

Vattenstämplar interagerar med utdatalägen för att styra när data skrivs till mottagaren. Eftersom vattenstämplar minskar den totala mängden tillståndsinformation som ska bearbetas är effektiv användning av vattenstämplar avgörande för effektivt tillståndskänsligt strömmande dataflöde.

Kommentar

Alla utdatalägen stöds inte för alla tillståndskänsliga åtgärder.

Vattenstämplar och utdataläge för fönsteraggregeringar

I följande tabell beskrivs bearbetning för frågor med aggregering på en tidsstämpel med en definierad vattenstämpel:

Utdataläge Funktionssätt
Lägga till Rader skrivs till måltabellen när vattenstämpeltröskeln har passerat. Alla skrivningar fördröjs baserat på tröskelvärdet för fördröjning. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat.
Uppdatera Rader skrivs till måltabellen när resultaten beräknas och kan uppdateras och skrivas över när nya data tas emot. Det gamla aggregeringstillståndet tas bort när tröskelvärdet har passerat.
Klart Sammansättningstillståndet tas inte bort. Måltabellen skrivs om med varje utlösare.

Vattenstämplar och utdata för strömströmanslutningar

Kopplingar mellan flera strömmar stöder endast tilläggsläge och matchade poster skrivs i varje batch som de identifieras. För inre kopplingar rekommenderar Databricks att du anger ett tröskelvärde för vattenstämpel för varje strömmande datakälla. Detta gör att tillståndsinformation kan ignoreras för gamla poster. Utan vattenstämplar försöker Structured Streaming ansluta varje nyckel från båda sidor av kopplingen med varje utlösare.

Strukturerad direktuppspelning har särskilda semantik för att stödja yttre kopplingar. Vattenstämpling är obligatoriskt för yttre kopplingar, eftersom det anger när en nyckel måste skrivas med ett null-värde efter att ha gått omatchat. Observera att även om yttre kopplingar kan vara användbara för att registrera poster som aldrig matchas under databearbetningen, eftersom kopplingar endast skrivs till tabeller som tilläggsåtgärder, registreras inte dessa saknade data förrän efter att tröskelvärdet för fördröjning har passerat.

Kontrollera tröskelvärdet för sena data med principen för flera vattenstämplar i strukturerad direktuppspelning

När du arbetar med flera indata för strukturerad direktuppspelning kan du ange flera vattenstämplar för att kontrollera toleranströsklar för data som kommer sent. Genom att konfigurera vattenstämplar kan du styra tillståndsinformationen och påverka svarstiden.

En direktuppspelningsfråga kan ha flera indataströmmar som är sammankopplade eller kopplade. Var och en av indataströmmarna kan ha olika tröskelvärden för sena data som måste tolereras för tillståndskänsliga åtgärder. Ange dessa tröskelvärden med var withWatermarks("eventTime", delay) och en av indataströmmarna. Följande är en exempelfråga med stream-stream-kopplingar.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

När du kör frågan spårar Structured Streaming individuellt den maximala händelsetiden som visas i varje indataström, beräknar vattenstämplar baserat på motsvarande fördröjning och väljer en enda global vattenstämpel med dem som ska användas för tillståndskänsliga åtgärder. Som standard väljs minimivärdet som global vattenstämpel eftersom det säkerställer att inga data oavsiktligt tas bort som för sent om en av strömmarna hamnar bakom de andra (till exempel slutar en av strömmarna att ta emot data på grund av uppströmsfel). Med andra ord rör sig den globala vattenstämpeln säkert i den långsammaste strömmens takt och frågeutdata fördröjs i enlighet därmed.

Om du vill få snabbare resultat kan du ange principen för flera vattenstämplar för att välja det maximala värdet som global vattenstämpel genom att ange SQL-konfigurationen spark.sql.streaming.multipleWatermarkPolicy till max (standardvärdet är min). På så sätt kan den globala vattenstämpeln röra sig i den snabbaste strömmens takt. Den här konfigurationen släpper dock data från de långsammaste strömmarna. Därför rekommenderar Databricks att du använder den här konfigurationen på ett omdömesgillt sätt.

Släpp dubbletter i vattenstämpeln

I Databricks Runtime 13.3 LTS och senare kan du deduplicera poster inom ett tröskelvärde för vattenstämpel med hjälp av en unik identifierare.

Strukturerad direktuppspelning ger bearbetningsgarantier exakt en gång, men deduplicerar inte automatiskt poster från datakällor. Du kan använda dropDuplicatesWithinWatermark för att deduplicera poster i ett angivet fält, så att du kan ta bort dubbletter från en dataström även om vissa fält skiljer sig åt (till exempel händelsetid eller ankomsttid).

Dubbletter av poster som tas emot inom den angivna vattenstämpeln kommer garanterat att tas bort. Den här garantin är strikt i endast en riktning, och dubbletter av poster som kommer utanför det angivna tröskelvärdet kan också tas bort. Du måste ange tröskelvärdet för fördröjning för vattenstämpeln som är längre än maximala tidsstämpelskillnader mellan duplicerade händelser för att ta bort alla dubbletter.

Du måste ange en vattenstämpel för att använda dropDuplicatesWithinWatermark metoden, som i följande exempel:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")