Apply watermarks to control data processing thresholds

This page describes watermarking concepts and has recommendations for using watermarks in common stateful streaming operations.

Streaming queries accumulate state data over time. Watermarks automatically remove old state data to prevent memory errors and increased processing latency.

What is a watermark?

During processing, Structured Streaming keeps state across micro-batches. Streaming queries use state to incrementally update results instead of recomputing everything after each micro-batch. Watermarks control the threshold for when a query stops processing a state entity.

Common examples of state entities include:

  • Aggregations over a time window.
  • Unique keys in a join between two streams.

To declare a watermark on a streaming DataFrame, specify a timestamp field and a lateness threshold. As new data arrives, the state manager tracks the most recent timestamp in the specified field and processes only records within the lateness threshold.

Queries always process records that arrive within the threshold. Queries might still process records that arrive outside the threshold, but this isn't guaranteed.

The following example applies a 10 minute watermark threshold to a windowed count:

Python

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

Scala

import org.apache.spark.sql.functions.window

df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()

In this example:

  • The event_time column is used to define a 10 minute watermark and a 5 minute tumbling window.
  • A count is collected for each id observed for each non-overlapping 5 minute window.
  • State information is maintained for each count until the end of the window is 10 minutes older than the latest observed event_time.

Important

In a groupBy() and window() operation, reference columns by name, "<colName>" or col("<colName>"), to ensure the event time marker is preserved. In Scala, you can also use $colName.

How do watermarks affect processing time and throughput?

Output modes control when a query with watermarks writes data to the sink. Watermarks are essential for throughput control in stateful streaming because they reduce the total amount of state information in memory. Not all output modes are supported for all stateful operations. See Watermarks and output mode for windowed aggregations.

Selecting a watermark duration has tradeoffs:

  • Shorter watermarks lower query latency because queries store less state information and write results after each watermark duration completes. However, short watermarks have low tolerance for late data.
  • Longer watermarks have a high tolerance for late data. However, long watermarks increase query latency because queries must store more state information and wait to write results after a longer watermark duration.

Watermarks and output mode for windowed aggregations

The following table shows processing behavior for queries with aggregation on a timestamp and a watermark:

Output mode Behavior
Append The query writes rows to the target table after the watermark threshold has passed. All writes are delayed based on the lateness threshold. Old aggregation state is dropped after the threshold has passed.
Update The query writes rows to the target table as results are calculated, and the query can update and overwrite rows as new data arrives. Old aggregation state is dropped after the threshold has passed.
Complete Aggregation state isn't dropped. The query rewrites the target table for each trigger.

Watermarks and output modes for stream-stream joins

Joins between multiple streams only support append mode. Queries write matched records for each batch.

For inner joins, Databricks recommends that you set a watermark threshold on each streaming data source to allow the query to discard state information for old records. Without watermarks, Structured Streaming attempts to join every key from both sides of the join on each trigger, which might affect performance.

For outer joins, watermarking is mandatory. When a record is unmatched, the query writes a null value for that key. Because joins only support append mode, unmatched records aren't written until the lateness threshold passes.

Control late data threshold with a multiple watermarks policy

For multiple Structured Streaming inputs, you can set multiple watermarks to control tolerance thresholds for late data. Watermarks allow you to control state information and latency.

A streaming query can have multiple input streams that are unioned or joined together. For stateful operations, each of the input streams might require a different threshold for late data tolerance. Specify these thresholds using withWatermark("eventTime", delay) on each input stream. The following is an example query with stream-stream joins.

Python

input_stream1 = ...      # delays up to 1 hour
input_stream2 = ...      # delays up to 2 hours

(input_stream1.withWatermark("eventTime1", "1 hour")
  .join(
    input_stream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)
)

Scala

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

While running the query with stateful operations, Structured Streaming individually tracks the maximum event time for each input stream, calculates watermarks based on the corresponding delay, and determines a single global watermark. By default, Structured Streaming uses the minimum as the global watermark. If a stream falls behind the others, a minimum global watermark prevents the query from accidentally marking data as late. For example, this might occur when one of the streams stops receiving data because of upstream failures. The global watermark safely moves at the pace of the slowest stream and delays the query output when necessary.

To reduce latency, set spark.sql.streaming.multipleWatermarkPolicy to max (default is min) to use the fastest stream's watermark as the global watermark. However, this configuration drops data from the slowest streams. Databricks recommends that you apply this configuration with caution.

Apply watermarks to distinct operations

The distinct operation tracks every unique record in state. Without a watermark, state grows indefinitely and can cause memory issues. Specify a watermark on a timestamp field to bound state and remove old records after the threshold passes.

The following example applies a watermark to a distinct operation:

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

In this example, the streaming query removes duplicate records that arrive within 1 hour of the latest observed eventTime. The query drops state information for deduplication after the threshold passes.

Important

To deduplicate specific columns instead of all columns, use dropDuplicates() or dropDuplicatesWithinWatermark() instead of distinct. See Drop duplicates within watermark.

Drop duplicates within watermark

In Databricks Runtime 13.3 LTS or above, you can use a unique identifier to deduplicate records within a watermark threshold.

Structured Streaming guarantees exactly-once processing but doesn't deduplicate records from data sources. Use dropDuplicatesWithinWatermark to remove duplicates on any field, even when fields differ across duplicate records, such as event time or arrival time.

With dropDuplicatesWithinWatermark, queries always deduplicate records that arrive within the watermark threshold. Queries might also deduplicate records that arrive outside the threshold, but this isn't guaranteed. To guarantee queries drop all duplicates, set the watermark threshold to be greater than the maximum timestamp difference between duplicate events.

You must specify a watermark to use the dropDuplicatesWithinWatermark method:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))

Use case examples

The following examples show advanced windowing use cases:

Use tumbling windows to calculate hour sales totals

Tumbling windows are fixed-size with non-overlapping intervals. Each input row belongs to exactly one window. Use tumbling windows to compute discrete time-period aggregations, such as hourly sales totals:

Python

from pyspark.sql.functions import window, sum

hourly_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val hourlySales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

In this example:

  • window("timestamp", "1 hour") groups orders into non-overlapping 1-hour intervals, such as 5 to 6 AM and 6 to 7 AM.
  • withWatermark("timestamp", "1 hour") keeps each window's aggregate in state until the window end timestamp is 1 hour older than the maximum order timestamp.

Use sliding windows to calculate rolling aggregates

Sliding windows are fixed-size with intervals that can overlap. A single row can belong to multiple windows. Use sliding windows to compute rolling aggregates, such as sales over a rolling 6-hour period:

Python

from pyspark.sql.functions import window, sum

rolling_sales = (orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
  .agg(sum("amount").alias("total_sales"))
)

Scala

import org.apache.spark.sql.functions.{window, sum}

val rollingSales = orders
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "6 hours", "1 hour"))
  .agg(sum($"amount").alias("total_sales"))

In this example:

  • window("timestamp", "6 hours", slideDuration="1 hour") groups orders into 6-hour intervals that advance by 1 hour, for example, 5 to 11 AM and 6 AM to 12 PM.
  • withWatermark("timestamp", "1 hour") keeps each window's aggregate in state until the window end timestamp is 1 hour older than the maximum order timestamp.
  • slideDuration must be less than or equal to the windowDuration.

Use session windows to check user activity

Session windows have no fixed size. A window opens when a row arrives and closes after a gap duration that contains no new rows. Use session windows to aggregate activity bursts between long idle periods, such as a user's page views within a 30-minute period:

Python

from pyspark.sql.functions import session_window, sum

sessionized_page_views = (activity
  .withWatermark("timestamp", "1 hour")
  .groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
  .agg(sum("page_views").alias("total_page_views"))
)

Scala

import org.apache.spark.sql.functions.{session_window, sum}

val sessionizedPageViews = activity
  .withWatermark("timestamp", "1 hour")
  .groupBy($"user_id", session_window($"timestamp", "30 minutes"))
  .agg(sum($"page_views").alias("total_page_views"))

In this example:

  • session_window("timestamp", gapDuration="30 minutes") opens a window when the first page view arrives. Each subsequent page view that arrives within 30 minutes extends the window. When no page view arrives within 30 minutes, the window closes and the next page view starts a new window.
  • withWatermark("timestamp", "1 hour") keeps each session's aggregate in state until the window end timestamp is 1 hour older than the maximum page view timestamp.
  • The timeColumn argument for window() and session_window() must be of TimestampType or TimestampNTZType.
  • Use current_timestamp() to define windows based on processing time rather than event time.
  • You can set window durations from microseconds up to days. Month durations and longer are not supported.
  • Use complete output mode with windowed aggregations to keep all window state indefinitely. Use append output mode with an appropriate watermark to bound state growth and prevent memory issues for large data sets. For more detail on output mode behavior, see Watermarks and output mode for windowed aggregations.