Delen via


Watermerken toepassen om drempelwaarden voor gegevensverwerking te beheren

In dit artikel worden de basisconcepten van watermerken geïntroduceerd en worden aanbevelingen geboden voor het gebruik van watermerken in algemene stateful streamingbewerkingen. U moet watermerken toepassen op stateful streamingbewerkingen om te voorkomen dat de hoeveelheid gegevens die in de status worden bewaard oneindig wordt uitgebreid, waardoor geheugenproblemen kunnen optreden en de verwerkingslatenties tijdens langdurige streamingbewerkingen kunnen toenemen.

Wat is een watermerk?

Structured Streaming maakt gebruik van watermerken om de drempelwaarde te bepalen voor hoe lang updates voor een bepaalde statusentiteit moeten worden verwerkt. Veelvoorkomende voorbeelden van statusentiteiten zijn:

  • Aggregaties in een tijdvenster.
  • Unieke sleutels in een join tussen twee streams.

Wanneer u een watermerk declareert, geeft u een tijdstempelveld en een grenswaarde op voor een streaming DataFrame. Wanneer er nieuwe gegevens binnenkomen, houdt de statusbeheerder de meest recente tijdstempel in het opgegeven veld bij en verwerkt alle records binnen de drempelwaarde voor late tijd.

In het volgende voorbeeld wordt een drempelwaarde van 10 minuten toegepast op een aantal vensters:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In dit voorbeeld:

  • De event_time kolom wordt gebruikt om een watermerk van 10 minuten en een tumblingvenster van 5 minuten te definiëren.
  • Er wordt een telling verzameld voor elke id waargenomen voor elke niet-overlappende vensters van 5 minuten.
  • Statusinformatie wordt bijgehouden voor elke telling tot het einde van het venster 10 minuten ouder is dan de meest recente waargenomen event_time.

Belangrijk

Drempelwaarden voor watermerken garanderen dat records die binnen de opgegeven drempelwaarde binnenkomen, worden verwerkt volgens de semantiek van de gedefinieerde query. Late binnenkomende records die buiten de opgegeven drempelwaarde binnenkomen, kunnen nog steeds worden verwerkt met behulp van metrische querygegevens, maar dit is niet gegarandeerd.

Hoe beïnvloeden watermerken de verwerkingstijd en doorvoer?

Watermerken communiceren met uitvoermodi om te bepalen wanneer gegevens naar de sink worden geschreven. Omdat watermerken de totale hoeveelheid statusgegevens verminderen die moeten worden verwerkt, is effectief gebruik van watermerken essentieel voor efficiënte stateful streamingdoorvoer.

Notitie

Niet alle uitvoermodi worden ondersteund voor alle stateful bewerkingen.

Watermerken en uitvoermodus voor gevensterde aggregaties

In de volgende tabel wordt de verwerking van query's met aggregatie op een tijdstempel met een watermerk gedefinieerd:

Uitvoermodus Gedrag
Toevoegen Rijen worden naar de doeltabel geschreven zodra de drempelwaarde voor het watermerk is verstreken. Alle schrijfbewerkingen worden vertraagd op basis van de drempelwaarde voor late tijd. De oude aggregatiestatus wordt verwijderd zodra de drempelwaarde is verstreken.
Bijwerken Rijen worden naar de doeltabel geschreven wanneer de resultaten worden berekend en kunnen worden bijgewerkt en overschreven wanneer nieuwe gegevens binnenkomen. De oude aggregatiestatus wordt verwijderd zodra de drempelwaarde is verstreken.
Voltooid De aggregatiestatus wordt niet verwijderd. De doeltabel wordt herschreven met elke trigger.

Watermerken en uitvoer voor stream-stream-joins

Joins tussen meerdere streams ondersteunen alleen de toevoegmodus en overeenkomende records worden geschreven in elke batch die ze worden gedetecteerd. Voor inner joins raadt Databricks aan om een grenswaarde in te stellen voor elke streaminggegevensbron. Hierdoor kunnen statusgegevens worden verwijderd voor oude records. Zonder watermerken probeert Structured Streaming elke sleutel van beide zijden van de join samen te voegen met elke trigger.

Structured Streaming heeft speciale semantiek ter ondersteuning van outer joins. Watermerken zijn verplicht voor outer joins, zoals wordt aangegeven wanneer een sleutel moet worden geschreven met een null-waarde nadat deze niet overeenkomt. Hoewel outer joins nuttig kunnen zijn voor het opnemen van records die nooit overeenkomen tijdens gegevensverwerking, omdat joins alleen schrijven naar tabellen als toevoegbewerkingen, worden deze ontbrekende gegevens pas geregistreerd nadat de drempelwaarde voor late tijd is verstreken.

De drempelwaarde voor late gegevens beheren met beleid voor meerdere watermerken in Structured Streaming

Wanneer u met meerdere Structured Streaming-invoer werkt, kunt u meerdere watermerken instellen om tolerantiedrempels te beheren voor gegevens die te laat binnenkomen. Door watermerken te configureren, kunt u statusgegevens beheren en latentie beïnvloeden.

Een streamingquery kan meerdere invoerstromen hebben die zijn samengevoegd of samengevoegd. Elk van de invoerstromen kan een andere drempelwaarde hebben voor late gegevens die moeten worden getolereerd voor stateful bewerkingen. Geef deze drempelwaarden op met behulp withWatermarks("eventTime", delay) van elk van de invoerstromen. Hier volgt een voorbeeldquery met stream-stream joins.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Tijdens het uitvoeren van de query houdt Structured Streaming afzonderlijk de maximale gebeurtenistijd bij die wordt weergegeven in elke invoerstroom, berekent watermerken op basis van de bijbehorende vertraging en kiest u één globaal watermerk met deze voor stateful bewerkingen. Standaard wordt het minimum als globaal watermerk gekozen, omdat er geen gegevens per ongeluk als te laat worden verwijderd als een van de stromen achter de andere stromen valt (bijvoorbeeld een van de streams stopt met het ontvangen van gegevens vanwege upstreamfouten). Met andere woorden, het globale watermerk wordt veilig verplaatst in het tempo van de traagste stroom en de query-uitvoer wordt dienovereenkomstig vertraagd.

Als u snellere resultaten wilt krijgen, kunt u het beleid voor meerdere watermerken instellen om de maximumwaarde als het globale watermerk te kiezen door de SQL-configuratie spark.sql.streaming.multipleWatermarkPolicy in te max stellen op (standaardinstelling min). Hierdoor kan het wereldwijde watermerk in het tempo van de snelste stroom worden verplaatst. Deze configuratie verwijdert echter gegevens uit de langzaamste stromen. Daarom raadt Databricks u aan deze configuratie zorgvuldig te gebruiken.

Dubbele waarden binnen watermerk verwijderen

In Databricks Runtime 13.3 LTS en hoger kunt u records binnen een grenswaarde ontdubbelen met behulp van een unieke id.

Structured Streaming biedt exactly-once verwerkingsgaranties, maar ontdubbelt niet automatisch records uit gegevensbronnen. U kunt dropDuplicatesWithinWatermark records op elk opgegeven veld ontdubbelen, zodat u duplicaten uit een stroom kunt verwijderen, zelfs als sommige velden verschillen (zoals gebeurtenistijd of aankomsttijd).

Dubbele records die binnen het opgegeven watermerk binnenkomen, worden gegarandeerd verwijderd. Deze garantie is strikt in slechts één richting en dubbele records die buiten de opgegeven drempelwaarde aankomen, kunnen ook worden verwijderd. U moet de vertragingsdrempel van watermerk langer instellen dan de maximale tijdstempelverschillen tussen dubbele gebeurtenissen om alle duplicaten te verwijderen.

U moet een watermerk opgeven om de dropDuplicatesWithinWatermark methode te gebruiken, zoals in het volgende voorbeeld:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])