Anwenden von Wasserzeichen zum Steuern von Datenverarbeitungsschwellenwerten

In diesem Artikel werden die grundlegenden Konzepte des Wasserzeichens vorgestellt und Empfehlungen für die Verwendung von Wasserzeichen in gängigen zustandsbehafteten Streamingvorgängen bereitgestellt. Sie müssen Wasserzeichen auf zustandsbehaftete Streamingvorgänge anwenden, um zu vermeiden, dass die Menge der im Zustand gespeicherten Daten unendlich erweitert wird. Dies kann nämlich zu Speicherproblemen führen und die Verarbeitungslatenz bei Streamingvorgängen mit langer Ausführungsdauer erhöhen.

Was ist ein Wasserzeichen?

Strukturiertes Streaming verwendet Wasserzeichen, um den Schwellenwert für die Dauer der Verarbeitung von Updates für eine bestimmte Zustandsentität zu steuern. Zu den gängigen Beispielen für Zustandsentitäten gehören:

  • Aggregationen über ein Zeitfenster.
  • Eindeutige Schlüssel in einer Verknüpfung zwischen zwei Streams.

Wenn Sie ein Wasserzeichen deklarieren, geben Sie ein Zeitstempelfeld und einen Wasserzeichenschwellenwert für einen Streaming-DataFrame an. Wenn neue Daten eintreffen, verfolgt der Zustandsmanager den letzten Zeitstempel im angegebenen Feld und verarbeitet alle Datensätze innerhalb des Verspätungsschwellenwerts.

Im folgenden Beispiel wird ein Wasserzeichenschwellenwert von 10 Minuten auf eine Anzahl an Fenstern angewendet:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In diesem Beispiel:

  • Die Spalte event_time wird verwendet, um ein Wasserzeichen von 10 Minuten und ein rollierendes Fenster von 5 Minuten zu definieren.
  • Für jede beobachtete id wird eine Anzahl während jedes nicht überlappenden 5-Minuten-Fensters gesammelt.
  • Zustandsinformationen werden für jede Anzahl beibehalten, bis das Ende des Fensters 10 Minuten älter ist als die zuletzt beobachtete event_time.

Wichtig

Wasserzeichen-Schwellenwerte garantieren, dass Datensätze, die innerhalb des angegebenen Schwellenwerts eintreffen, gemäß der Semantik der definierten Abfrage verarbeitet werden. Verspätet eintreffende Datensätze, die außerhalb des angegebenen Schwellenwerts liegen, werden möglicherweise noch mit Abfragemetriken verarbeitet, was jedoch nicht garantiert ist.

Wie wirken sich Wasserzeichen auf die Verarbeitungszeit und den Durchsatz aus?

Wasserzeichen interagieren mit Ausgabemodi, um zu steuern, wann Daten in die Senke geschrieben werden. Da Wasserzeichen die Gesamtmenge der zu verarbeitenden Zustandsinformationen reduzieren, ist eine effektive Verwendung von Wasserzeichen für einen effizienten zustandsbehafteten Streamingdurchsatz unerlässlich.

Hinweis

Nicht alle Ausgabemodi werden für alle zustandsbehafteten Vorgänge unterstützt.

Wasserzeichen und Ausgabemodus für Aggregationen im Fenstermodus

In der folgenden Tabelle wird die Verarbeitung von Abfragen mit Aggregation auf einem Zeitstempel mit einem definierten Wasserzeichen beschrieben:

Ausgabemodus Verhalten
Anfügen Zeilen werden in die Zieltabelle geschrieben, sobald der Wasserzeichenschwellenwert überschritten wurde. Alle Schreibvorgänge werden basierend auf dem Schwellenwert für Verspätungen verzögert. Der alte Aggregationszustand wird gelöscht, sobald der Schwellenwert überschritten wurde.
Aktualisieren Zeilen werden beim Berechnen der Ergebnisse in die Zieltabelle geschrieben und können aktualisiert und überschrieben werden, wenn neue Daten eintreffen. Der alte Aggregationszustand wird gelöscht, sobald der Schwellenwert überschritten wurde.
Abgeschlossen Der Aggregationszustand wird nicht gelöscht. Die Zieltabelle wird mit jedem Trigger neu geschrieben.

Wasserzeichen und Ausgabe für Stream-Stream-Verknüpfungen

Verknüpfungen zwischen mehreren Datenströmen unterstützen nur den Anfügemodus, und übereinstimmende Datensätze werden in jeden Batch geschrieben, in dem sie ermittelt wurden. Für innere Verknüpfungen empfiehlt Databricks, einen Wasserzeichenschwellenwert für jede Streamingdatenquelle festzulegen. Dadurch können Zustandsinformationen für alte Datensätze verworfen werden. Ohne Wasserzeichen versucht strukturiertes Streaming, jeden Schlüssel von beiden Seiten der Verknüpfung mit jedem Trigger zu verknüpfen.

Strukturiertes Streaming verfügt über eine spezielle Semantik, um äußere Verknüpfungen zu unterstützen. Wasserzeichen sind für äußere Verknüpfungen obligatorisch. Sie geben an, wann ein Schlüssel mit einem NULL-Wert geschrieben werden muss, nachdem keine Übereinstimmungen dafür gefunden wurden. Äußere Verknüpfungen können zwar nützlich für die Aufzeichnung von Datensätzen sein, für die es während der Datenverarbeitung keine Übereinstimmungen gab (da Verknüpfungen nur als Anfügevorgänge in Tabellen geschrieben werden). Diese fehlenden Daten werden jedoch erst aufgezeichnet, nachdem der Schwellenwert für die Verspätung überschritten wurde.

Steuern des Schwellenwerts für verspätete Daten mit mehreren Wasserzeichenrichtlinien beim strukturierten Streaming

Wenn Sie mit mehreren strukturierten Streaming-Eingaben arbeiten, können Sie mehrere Grenzwerte festlegen, um Toleranzschwellenwerte für spät ankommende Daten zu steuern. Durch das Konfigurieren von Grenzwerten können Sie Informationen zu Steuerelementzuständen steuern und die Wartezeit beeinflussen.

Eine Streamingabfrage kann mehrere Eingabestreams aufweisen, die vereinigt oder miteinander verbunden werden. Jeder Eingabestream kann über einen anderen Schwellenwert für verspätete Daten verfügen, der für zustandsbehaftete Vorgänge toleriert werden muss. Geben Sie diese Schwellenwerte mit withWatermarks("eventTime", delay) für jeden der Eingabestreams an. Im Folgenden finden Sie eine Beispielabfrage mit Stream-Stream-Joins.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Beim Ausführen der Abfrage führt das strukturierte Streaming die maximale Ereigniszeit jedes Eingabestreams einzeln nach, berechnet Grenzwerte basierend auf der entsprechenden Verzögerung und wählt einen einzelnen globalen Grenzwert aus, das für zustandsbehaftete Vorgänge verwendet werden soll. Standardmäßig wird der Mindestwert als globales Wasserzeichen ausgewählt, da dadurch sichergestellt wird, dass keine Daten versehentlich als zu spät gelöscht werden, wenn einer der Streams hinter den anderen zurückliegt (z. B. wenn einer der Streams aufgrund von vorgelagerten Fehlern keine Daten mehr empfängt). Anders ausgedrückt: Der globale Grenzwert bewegt sich sicher mit der Geschwindigkeit des langsamsten Streams, und die Abfrageausgabe verzögert sich entsprechend.

Wenn Sie schnellere Ergebnisse erzielen möchten, können Sie die Richtlinie für mehrere Grenzwerte so festlegen, dass der Höchstwert als globaler Grenzwert ausgewählt wird, indem Sie die SQL-Konfiguration spark.sql.streaming.multipleWatermarkPolicy auf max setzen (Standardeinstellung ist min). Dadurch kann sich das globale Wasserzeichen mit der Geschwindigkeit des schnellsten Streams bewegen. Diese Konfiguration verwirft jedoch Daten aus den langsamsten Datenströmen. Databricks empfiehlt daher, diese Konfiguration mit Bedacht zu verwenden.

Duplikate innerhalb des Wasserzeichens löschen

In Databricks Runtime 13.3 LTS und höher können Sie Datensätze innerhalb eines Wasserzeichen-Schwellenwerts anhand eines eindeutigen Bezeichners deduplizieren.

Strukturiertes Streaming bietet Garantien für eine einmalige Verarbeitung, führt aber keine automatische Deduplizierung von Datensätzen aus Datenquellen durch. Sie können dropDuplicatesWithinWatermark verwenden, um Datensätze für jedes angegebene Feld zu deduplizieren, sodass Sie Duplikate aus einem Stream entfernen können, auch wenn sich einige Felder unterscheiden (z. B. Ereigniszeit oder Ankunftszeit).

Doppelte Datensätze, die innerhalb des angegebenen Wasserzeichens eintreffen, werden garantiert gelöscht. Diese Garantie gilt nur in eine Richtung. Doppelte Datensätze, die außerhalb des festgelegten Schwellenwerts eintreffen, werden möglicherweise ebenfalls gelöscht. Sie müssen den Schwellenwert für die Verzögerung des Wasserzeichens länger als die maximale Zeitstempeldifferenz zwischen den duplizierten Ereignissen einstellen, um alle Duplikate zu entfernen.

Sie müssen ein Wasserzeichen angeben, um die dropDuplicatesWithinWatermark-Methode zu verwenden, wie im folgenden Beispiel:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")