Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Diese Seite beschreibt Wasserzeichenkonzepte und enthält Empfehlungen für die Verwendung von Wasserzeichen in gängigen zustandsbehafteten Streaming-Operationen.
Streamingabfragen sammeln Zustandsdaten im Laufe der Zeit. Wasserzeichen entfernen automatisch alte Zustandsdaten, um Speicherfehler und erhöhte Verarbeitungslatenz zu verhindern.
Was ist ein Wasserzeichen?
Während der Verarbeitung behält strukturiertes Streaming den Status über Mikrobatches hinweg bei. Streamingabfragen verwenden den Zustand, um Ergebnisse inkrementell zu aktualisieren, anstatt alles nach jedem Mikrobatch neu zu komputieren. Wasserzeichen legen den Schwellenwert fest, ab dem eine Abfrage die Verarbeitung einer Zustandsentität beendet.
Zu den häufigen Beispielen von staatlichen Einrichtungen gehören:
- Aggregationen über ein Zeitfenster.
- Eindeutige Schlüssel in einer Verknüpfung zwischen zwei Streams.
Um ein Wasserzeichen für einen Streaming-DataFrame zu definieren, geben Sie ein Zeitstempelfeld und einen Schwellenwert für die zulässige Verspätung an. Wenn neue Daten eingehen, verfolgt der Zustandsmanager den letzten Zeitstempel im angegebenen Feld und verarbeitet nur Datensätze innerhalb des Spätheitsschwellenwerts.
Abfragen verarbeiten immer Datensätze, die innerhalb des Schwellenwerts eingehen. Abfragen verarbeiten möglicherweise weiterhin Datensätze, die außerhalb des Schwellenwerts eingehen, aber dies ist nicht garantiert.
Im folgenden Beispiel wird ein Wasserzeichenschwellenwert von 10 Minuten auf eine fensterübergreifende Zählung angewendet.
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
In diesem Beispiel:
- Die Spalte
event_timewird verwendet, um ein Wasserzeichen von 10 Minuten und ein rollierendes Fenster von 5 Minuten zu definieren. - Für jedes beobachtete
idwird für jedes nicht überlappende 5-Minuten-Fenster eine Anzahl erfasst. - Zustandsinformationen werden für jeden Zählerstand beibehalten, bis das Ende des Fensters 10 Minuten vor dem zuletzt beobachteten
event_timeliegt.
Wichtig
Verweisen Sie in einer groupBy() und window() Operation auf Spalten anhand ihres Namens, "<colName>" oder col("<colName>"), um sicherzustellen, dass die Ereigniszeitmarkierung beibehalten wird. In Scala können Sie auch verwenden $colName.
Wie wirken sich Wasserzeichen auf die Verarbeitungszeit und den Durchsatz aus?
Ausgabemodi steuern, wenn eine Abfrage mit Wasserzeichen Daten in die Spüle schreibt. Wasserzeichen sind für die Durchsatzsteuerung im zustandsbehafteten Streaming unerlässlich, da sie die Gesamtmenge der Zustandsinformationen im Arbeitsspeicher reduzieren. Nicht alle Ausgabemodi werden für alle zustandsbehafteten Operationen unterstützt. Siehe Wasserzeichen und Ausgabemodus für Fensteraggregationen.
Die Auswahl der Dauer eines Wasserzeichens bringt Kompromisse mit sich:
- Kürzere Wasserzeichen verringern die Abfragelatenz, weil Abfragen weniger Statusinformationen speichern und nach Ablauf jeder Wasserzeichendauer Ergebnisse ausgeben. Kurze Wasserzeichen haben jedoch eine geringe Toleranz für späte Daten.
- Längere Wasserzeichen haben eine hohe Toleranz für späte Daten. Lange Wasserzeichen erhöhen jedoch die Abfragelatenz, da Abfragen mehr Zustandsinformationen speichern müssen und mit dem Schreiben von Ergebnissen bis zum Ablauf einer längeren Wasserzeichendauer warten müssen.
Wasserzeichen und Ausgabemodus für Fensteraggregationen
Die folgende Tabelle zeigt das Verarbeitungsverhalten für Abfragen mit Aggregation auf einem Zeitstempel und einem Wasserzeichen:
| Ausgabemodus | Verhalten |
|---|---|
| Anfügen | Die Abfrage schreibt Zeilen in die Zieltabelle, nachdem der Wasserzeichenschwellenwert überschritten wurde. Alle Schreibvorgänge werden basierend auf dem Schwellenwert für Verspätungen verzögert. Der alte Aggregationszustand wird gelöscht, nachdem der Schwellenwert überschritten wurde. |
| Aktualisieren | Die Abfrage schreibt Zeilen in die Zieltabelle, wenn Ergebnisse berechnet werden, und die Abfrage kann Zeilen aktualisieren und überschreiben, wenn neue Daten eingehen. Der alte Aggregationszustand wird gelöscht, nachdem der Schwellenwert überschritten wurde. |
| Abgeschlossen | Der Aggregationsstatus wird nicht gelöscht. Die Abfrage schreibt die Zieltabelle für jeden Trigger neu. |
Watermarks und Ausgabemodi für Stream-Stream-Joins
Verknüpfungen zwischen mehreren Datenströmen unterstützen nur den Anfügemodus. Abfragen schreiben übereinstimmene Datensätze für jeden Batch.
Bei inneren Verknüpfungen empfiehlt Databricks, einen Wasserzeichenschwellenwert für jede Streamingdatenquelle festzulegen, damit die Abfrage Zustandsinformationen für alte Datensätze verwerfen kann. Ohne Wasserzeichen versucht Strukturiertes Streaming, jeden Schlüssel von beiden Seiten der Verknüpfung auf jedem Trigger zu verbinden, was sich auf die Leistung auswirken kann.
Bei äußeren Verknüpfungen ist die Wasserzeichenerstellung obligatorisch. Wenn ein Datensatz nicht übereinstimmend ist, schreibt die Abfrage einen NULL-Wert für diesen Schlüssel. Da Verknüpfungen nur den Anfügemodus unterstützen, werden nicht übereinstimmende Datensätze erst geschrieben, wenn der Spätheitsschwellenwert überschritten wird.
Steuern des Schwellenwerts für verspätete Daten mit einer Richtlinie für mehrere Wasserzeichen
Bei mehreren strukturierten Streaming-Eingaben können Sie mehrere Wasserzeichen festlegen, um Toleranzschwellenwerte für verspätete Daten zu steuern. Mit Wasserzeichen können Sie Zustandsinformationen und Latenz steuern.
Eine Streamingabfrage kann mehrere Eingabestreams aufweisen, die vereinigt oder miteinander verbunden werden. Bei zustandsbehafteten Vorgängen kann jeder der Eingabedatenströme einen anderen Schwellenwert für verspätete Datentoleranz erfordern. Geben Sie diese Schwellenwerte mithilfe von withWatermark("eventTime", delay) bei jedem Eingabedatenstrom an. Im Folgenden ist eine Abfrage als Beispiel mit Stream-Stream-Joins dargestellt.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Beim Ausführen der Abfrage mit zustandsbehafteten Vorgängen verfolgt Structured Streaming einzeln die maximale Ereigniszeit für jeden Eingabedatenstrom, berechnet Wasserzeichen basierend auf der entsprechenden Verzögerung und bestimmt ein einzelnes globales Wasserzeichen. Standardmäßig verwendet Strukturiertes Streaming das Minimum als globales Wasserzeichen. Wenn ein Datenstrom hinter den anderen zurückbleibt, verhindert ein globales Mindest-Watermark, dass die Abfrage Daten versehentlich als verspätet kennzeichnet. Dies kann beispielsweise auftreten, wenn einer der Datenströme aufgrund vorgelagerter Fehler keine Daten mehr empfängt. Das globale Wasserzeichen bewegt sich sicher im Tempo des langsamsten Datenstroms und verzögert die Abfrageausgabe bei Bedarf.
Um die Latenz zu verringern, setzen Sie spark.sql.streaming.multipleWatermarkPolicy auf max (Standard ist min), um das Wasserzeichen des schnellsten Datenstroms als globales Wasserzeichen zu verwenden. Diese Konfiguration verwirft jedoch Daten aus den langsamsten Datenströmen. Databricks empfiehlt, diese Konfiguration mit Vorsicht anzuwenden.
Anwenden von Wasserzeichen auf unterschiedliche Vorgänge
Die distinct Operation erfasst jeden eindeutigen Datensatz im Zustand. Ohne Wasserzeichen wächst der Zustand unbegrenzt weiter und kann Speicherprobleme verursachen. Geben Sie ein Wasserzeichen für ein Zeitstempelfeld an, um den gebundenen Zustand zu definieren, und entfernen Sie alte Datensätze, nachdem der Schwellenwert überschritten wurde.
Im folgenden Beispiel wird ein Wasserzeichen auf einen distinct Vorgang angewendet:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
In diesem Beispiel entfernt die Streamingabfrage doppelte Datensätze, die innerhalb von 1 Stunde nach dem zuletzt beobachteten eventTime eintreffen. Die Abfrage verwirft Zustandsinformationen zur Deduplizierung, sobald der Schwellenwert überschritten ist.
Wichtig
Um bestimmte Spalten anstelle aller Spalten zu deduplizieren, verwenden dropDuplicates() Oder dropDuplicatesWithinWatermark() anstelle von distinct. Weitere Informationen finden Sie unter Löschen von Duplikaten innerhalb eines Wasserzeichens.
Duplikate innerhalb des Wasserzeichens löschen
In Databricks Runtime 13.3 LTS oder höher können Sie einen eindeutigen Bezeichner verwenden, um Datensätze innerhalb eines Wasserzeichenschwellenwerts zu deduplizieren.
Strukturiertes Streaming garantiert eine Genau-einmal-Verarbeitung, entfernt jedoch keine Duplikate aus den Datenquellen. Verwenden Sie dropDuplicatesWithinWatermark, um Duplikate in einem beliebigen Feld zu entfernen, auch wenn sich Felder in duplizierten Datensätzen unterscheiden, z. B. die Ereigniszeit oder die Ankunftszeit.
Bei dropDuplicatesWithinWatermarkAbfragen werden Datensätze, die innerhalb des Wasserzeichenschwellenwerts eingehen, immer dedupliziert. Abfragen können auch Datensätze deduplizieren, die außerhalb des Schwellenwerts eingehen, aber dies ist nicht garantiert. Um sicherzustellen, dass Abfragen alle Duplikate ablegen, legen Sie den Wasserzeichenschwellenwert so fest, dass er größer als der maximale Zeitstempelunterschied zwischen doppelten Ereignissen ist.
Sie müssen ein Wasserzeichen angeben, das die dropDuplicatesWithinWatermark Methode verwenden soll:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Anwendungsfallbeispiele
Die folgenden Beispiele zeigen erweiterte Anwendungsfälle für Fensterfenster:
Verwenden von Sturzfenstern zum Berechnen von Stundenumsatzsummen
Tumbling-Fenster haben eine feste Größe und nicht überlappende Intervalle. Jede Eingabezeile gehört zu genau einem Fenster. Verwenden Sie Tumblingfenster, um diskrete Zeitperiodenaggregationen zu berechnen, z. B. Stundenumsatzsummen:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
In diesem Beispiel:
-
window("timestamp", "1 hour")gruppiert Bestellungen in nicht überlappenden 1-Stunden-Intervallen, z. B. 5 bis 6 Uhr und 6 bis 7 Uhr. -
withWatermark("timestamp", "1 hour")behält das Aggregat jedes Fensters im Status bei, bis der Endzeitstempel des Fensters 1 Stunde älter ist als der maximale Order-Zeitstempel.
Verwenden Sie Gleitfenster zur Berechnung gleitender Aggregate
Gleitfenster haben eine feste Größe, und Intervalle können sich überlappen. Eine einzelne Zeile kann zu mehreren Fenstern gehören. Verwenden Sie gleitende Fenster, um laufende Aggregationen zu berechnen, z. B. Verkäufe über einen gleitenden Zeitraum von 6 Stunden:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
In diesem Beispiel:
-
window("timestamp", "6 hours", slideDuration="1 hour")gruppiert Bestellungen in 6-Stunden-Intervallen, die sich jeweils um 1 Stunde verschieben, z. B. 5 bis 11 Uhr und 6 bis 12 Uhr. -
withWatermark("timestamp", "1 hour")hält das Aggregat jedes Fensters im Zustand, bis der Endzeitstempel des Fensters eine Stunde älter ist als der maximale Zeitstempel der Bestellung. -
slideDurationmuss kleiner oder gleich groß wiewindowDurationsein.
Verwenden von Sitzungsfenstern zum Überprüfen von Benutzeraktivitäten
Sitzungsfenster haben keine feste Größe. Ein Fenster öffnet sich, wenn eine Zeile eintrifft, und schließt sich nach Ablauf eines Zeitraums, in dem keine neuen Zeilen eintreffen. Verwenden Sie Sitzungsfenster, um Aktivitätsbrüche zwischen langen Leerlaufzeiten zu aggregieren, z. B. seitenansichten eines Benutzers innerhalb eines Zeitraums von 30 Minuten:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
In diesem Beispiel:
-
session_window("timestamp", gapDuration="30 minutes")öffnet ein Fenster, wenn die erste Seitenansicht eingeht. Jede nachfolgende Seitenansicht, die innerhalb von 30 Minuten eintrifft, erweitert das Fenster. Wenn innerhalb von 30 Minuten kein Seitenaufruf erfolgt, schließt sich das Zeitfenster, und der nächste Seitenaufruf startet ein neues Zeitfenster. -
withWatermark("timestamp", "1 hour")hält das Aggregat jeder Sitzung im Status, bis der Zeitstempel des Fensterendes 1 Stunde älter ist als der maximale Zeitstempel eines Seitenaufrufs. - Das
timeColumn-Argument fürwindow()undsession_window()muss vom TypTimestampTypeoderTimestampNTZTypesein. - Dient
current_timestamp()zum Definieren von Fenstern basierend auf der Verarbeitungszeit anstelle der Ereigniszeit. - Sie können Fensterdauern von Mikrosekunden bis zu Tagen festlegen. Zeitspannen von einem Monat und länger werden nicht unterstützt.
- Verwenden Sie den
complete-Ausgabemodus mit fensterbasierten Aggregationen, um den vollständigen Fensterzustand unbegrenzt beizubehalten. Verwenden Sieappendden Ausgabemodus mit einem geeigneten Wasserzeichen, um das Zustandswachstum zu binden und Speicherprobleme für große Datasets zu verhindern. Weitere Informationen zum Verhalten des Ausgabemodus finden Sie unter Watermarks und Ausgabemodus für fensterbasierte Aggregationen.