Deila með


Apply watermarks to control data processing thresholds

This page describes the basic concepts of watermarking and provides recommendations for using watermarks in common stateful streaming operations. You must apply watermarks to stateful streaming operations to avoid infinitely expanding the amount of data kept in state, which can introduce memory issues or increase processing latencies during long-running streaming operations.

What is a watermark?

Structured Streaming uses watermarks to control the threshold for how long to continue processing updates for a given state entity. Common examples of state entities include:

  • Aggregations over a time window.
  • Unique keys in a join between two streams.

When you declare a watermark, you specify a timestamp field and a watermark threshold on a streaming DataFrame. As new data arrives, the state manager tracks the most recent timestamp in the specified field and processes all records within the lateness threshold.

The following example applies a 10 minute watermark threshold to a windowed count:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

In this example:

  • The event_time column is used to define a 10 minute watermark and a 5 minute tumbling window.
  • A count is collected for each id observed for each non-overlapping 5 minute windows.
  • State information is maintained for each count until the end of window is 10 minutes older than the latest observed event_time.

Important

Watermark thresholds guarantee that records arriving within the specified threshold are processed according to the semantics of the defined query. Late-arriving records arriving outside the specified threshold might still be processed using query metrics, but this isn't guaranteed.

How do watermarks impact processing time and throughput?

Watermarks interact with output modes to control when data is written to the sink. Because watermarks reduce the total amount of state information to be processed, effective use of watermarks is essential for efficient stateful streaming throughput.

Note

Not all output modes are supported for all stateful operations.

Watermarks and output mode for windowed aggregations

The following table details processing for queries with aggregation on a timestamp with a watermark defined:

Output mode Behavior
Append Rows are written to the target table after the watermark threshold has passed. All writes are delayed based on the lateness threshold. Old aggregation state is dropped after the threshold has passed.
Update Rows are written to the target table as results are calculated, and can be updated and overwritten as new data arrives. Old aggregation state is dropped after the threshold has passed.
Complete Aggregation state isn't dropped. The target table is rewritten with each trigger.

Watermarks and output for stream-stream joins

Joins between multiple streams only support append mode, and matched records are written in each batch they are discovered. For inner joins, Databricks recommends setting a watermark threshold on each streaming data source. This allows state information to be discarded for old records. Without watermarks, Structured Streaming attempts to join every key from both sides of the join with each trigger.

Structured Streaming has special semantics to support outer joins. Watermarking is mandatory for outer joins, as it indicates when a key must be written with a null value after going unmatched. While outer joins can be useful for recording records that are never matched during data processing, because joins only write to tables as append operations, this missing data isn't recorded until after the lateness threshold has passed.

Control late data threshold with multiple watermark policy in Structured Streaming

When working with multiple Structured Streaming inputs, you can set multiple watermarks to control tolerance thresholds for late-arriving data. Configuring watermarks allows you to control state information and impacts latency.

A streaming query can have multiple input streams that are unioned or joined together. Each of the input streams can have a different threshold of late data that needs to be tolerated for stateful operations. Specify these thresholds using withWatermarks("eventTime", delay) on each of the input streams. The following is an example query with stream-stream joins.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

While running the query, Structured Streaming individually tracks the maximum event time seen in each input stream, calculates watermarks based on the corresponding delay, and chooses a single global watermark with them to be used for stateful operations. By default, the minimum is chosen as the global watermark because it prevents data from being accidentally dropped as too late if one of the streams falls behind the others (for example, one of the streams stops receiving data due to upstream failures). In other words, the global watermark safely moves at the pace of the slowest stream and the query output is delayed accordingly.

If you want to get faster results, you can set the multiple watermark policy to choose the maximum value as the global watermark by setting the SQL configuration spark.sql.streaming.multipleWatermarkPolicy to max (default is min). This lets the global watermark move at the pace of the fastest stream. However, this configuration drops data from the slowest streams. Databricks recommends using this configuration judiciously.

Apply watermarks to distinct operations

The distinct operation is a stateful operator that requires watermarks to prevent unbounded state growth. Without watermarks, Structured Streaming attempts to track every unique record indefinitely, which can lead to memory issues or increased processing latencies.

When you apply distinct to a streaming DataFrame, you must specify a watermark on a timestamp field. The watermark controls how long the state manager maintains records for deduplication. After the watermark threshold passes, old records are removed from state.

The following example applies a watermark to a distinct operation:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

In this example, duplicate records arriving within 1 hour of the latest observed eventTime are removed from the stream. State information for deduplication is dropped after the threshold passes.

Important

If you need to deduplicate on specific columns rather than all columns, use dropDuplicates() or dropDuplicatesWithinWatermark() instead of distinct. See the next section for details.

Drop duplicates within watermark

In Databricks Runtime 13.3 LTS or later, you can deduplicate records within a watermark threshold using a unique identifier.

Structured Streaming provides exactly-once processing guarantees, but doesn't automatically deduplicate records from data sources. You can use dropDuplicatesWithinWatermark to deduplicate records on any specified field, allowing you to remove duplicates from a stream even if some fields differ (such as event time or arrival time).

Duplicate records that arrive within the specified watermark are guaranteed to be dropped. This guarantee is strict in only one direction, and duplicate records that arrive outside of the specified threshold might also be dropped. You must set the delay threshold of watermark longer than max timestamp differences among duplicated events to remove all duplicates.

You must specify a watermark to use the dropDuplicatesWithinWatermark method, as in the following example:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))