Nota
L-aċċess għal din il-paġna jeħtieġ l-awtorizzazzjoni. Tista’ tipprova tidħol jew tibdel id-direttorji.
L-aċċess għal din il-paġna jeħtieġ l-awtorizzazzjoni. Tista’ tipprova tibdel id-direttorji.
This page describes watermarking concepts and has recommendations for using watermarks in common stateful streaming operations.
Streaming queries accumulate state data over time. Watermarks automatically remove old state data to prevent memory errors and increased processing latency.
What is a watermark?
During processing, Structured Streaming keeps state across micro-batches. Streaming queries use state to incrementally update results instead of recomputing everything after each micro-batch. Watermarks control the threshold for when a query stops processing a state entity.
Common examples of state entities include:
- Aggregations over a time window.
- Unique keys in a join between two streams.
To declare a watermark on a streaming DataFrame, specify a timestamp field and a lateness threshold. As new data arrives, the state manager tracks the most recent timestamp in the specified field and processes only records within the lateness threshold.
Queries always process records that arrive within the threshold. Queries might still process records that arrive outside the threshold, but this isn't guaranteed.
The following example applies a 10 minute watermark threshold to a windowed count:
Python
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Scala
import org.apache.spark.sql.functions.window
df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
In this example:
- The
event_timecolumn is used to define a 10 minute watermark and a 5 minute tumbling window. - A count is collected for each
idobserved for each non-overlapping 5 minute window. - State information is maintained for each count until the end of the window is 10 minutes older than the latest observed
event_time.
Important
In a groupBy() and window() operation, reference columns by name, "<colName>" or col("<colName>"), to ensure the event time marker is preserved. In Scala, you can also use $colName.
How do watermarks affect processing time and throughput?
Output modes control when a query with watermarks writes data to the sink. Watermarks are essential for throughput control in stateful streaming because they reduce the total amount of state information in memory. Not all output modes are supported for all stateful operations. See Watermarks and output mode for windowed aggregations.
Selecting a watermark duration has tradeoffs:
- Shorter watermarks lower query latency because queries store less state information and write results after each watermark duration completes. However, short watermarks have low tolerance for late data.
- Longer watermarks have a high tolerance for late data. However, long watermarks increase query latency because queries must store more state information and wait to write results after a longer watermark duration.
Watermarks and output mode for windowed aggregations
The following table shows processing behavior for queries with aggregation on a timestamp and a watermark:
| Output mode | Behavior |
|---|---|
| Append | The query writes rows to the target table after the watermark threshold has passed. All writes are delayed based on the lateness threshold. Old aggregation state is dropped after the threshold has passed. |
| Update | The query writes rows to the target table as results are calculated, and the query can update and overwrite rows as new data arrives. Old aggregation state is dropped after the threshold has passed. |
| Complete | Aggregation state isn't dropped. The query rewrites the target table for each trigger. |
Watermarks and output modes for stream-stream joins
Joins between multiple streams only support append mode. Queries write matched records for each batch.
For inner joins, Databricks recommends that you set a watermark threshold on each streaming data source to allow the query to discard state information for old records. Without watermarks, Structured Streaming attempts to join every key from both sides of the join on each trigger, which might affect performance.
For outer joins, watermarking is mandatory. When a record is unmatched, the query writes a null value for that key. Because joins only support append mode, unmatched records aren't written until the lateness threshold passes.
Control late data threshold with a multiple watermarks policy
For multiple Structured Streaming inputs, you can set multiple watermarks to control tolerance thresholds for late data. Watermarks allow you to control state information and latency.
A streaming query can have multiple input streams that are unioned or joined together. For stateful operations, each of the input streams might require a different threshold for late data tolerance. Specify these thresholds using withWatermark("eventTime", delay) on each input stream. The following is an example query with stream-stream joins.
Python
input_stream1 = ... # delays up to 1 hour
input_stream2 = ... # delays up to 2 hours
(input_stream1.withWatermark("eventTime1", "1 hour")
.join(
input_stream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
)
Scala
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
While running the query with stateful operations, Structured Streaming individually tracks the maximum event time for each input stream, calculates watermarks based on the corresponding delay, and determines a single global watermark. By default, Structured Streaming uses the minimum as the global watermark. If a stream falls behind the others, a minimum global watermark prevents the query from accidentally marking data as late. For example, this might occur when one of the streams stops receiving data because of upstream failures. The global watermark safely moves at the pace of the slowest stream and delays the query output when necessary.
To reduce latency, set spark.sql.streaming.multipleWatermarkPolicy to max (default is min) to use the fastest stream's watermark as the global watermark. However, this configuration drops data from the slowest streams. Databricks recommends that you apply this configuration with caution.
Apply watermarks to distinct operations
The distinct operation tracks every unique record in state. Without a watermark, state grows indefinitely and can cause memory issues. Specify a watermark on a timestamp field to bound state and remove old records after the threshold passes.
The following example applies a watermark to a distinct operation:
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
In this example, the streaming query removes duplicate records that arrive within 1 hour of the latest observed eventTime. The query drops state information for deduplication after the threshold passes.
Important
To deduplicate specific columns instead of all columns, use dropDuplicates() or dropDuplicatesWithinWatermark() instead of distinct. See Drop duplicates within watermark.
Drop duplicates within watermark
In Databricks Runtime 13.3 LTS or above, you can use a unique identifier to deduplicate records within a watermark threshold.
Structured Streaming guarantees exactly-once processing but doesn't deduplicate records from data sources. Use dropDuplicatesWithinWatermark to remove duplicates on any field, even when fields differ across duplicate records, such as event time or arrival time.
With dropDuplicatesWithinWatermark, queries always deduplicate records that arrive within the watermark threshold. Queries might also deduplicate records that arrive outside the threshold, but this isn't guaranteed. To guarantee queries drop all duplicates, set the watermark threshold to be greater than the maximum timestamp difference between duplicate events.
You must specify a watermark to use the dropDuplicatesWithinWatermark method:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))
Use case examples
The following examples show advanced windowing use cases:
Use tumbling windows to calculate hour sales totals
Tumbling windows are fixed-size with non-overlapping intervals. Each input row belongs to exactly one window. Use tumbling windows to compute discrete time-period aggregations, such as hourly sales totals:
Python
from pyspark.sql.functions import window, sum
hourly_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val hourlySales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
In this example:
window("timestamp", "1 hour")groups orders into non-overlapping 1-hour intervals, such as 5 to 6 AM and 6 to 7 AM.withWatermark("timestamp", "1 hour")keeps each window's aggregate in state until the window end timestamp is 1 hour older than the maximum order timestamp.
Use sliding windows to calculate rolling aggregates
Sliding windows are fixed-size with intervals that can overlap. A single row can belong to multiple windows. Use sliding windows to compute rolling aggregates, such as sales over a rolling 6-hour period:
Python
from pyspark.sql.functions import window, sum
rolling_sales = (orders
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "6 hours", slideDuration="1 hour"))
.agg(sum("amount").alias("total_sales"))
)
Scala
import org.apache.spark.sql.functions.{window, sum}
val rollingSales = orders
.withWatermark("timestamp", "1 hour")
.groupBy(window($"timestamp", "6 hours", "1 hour"))
.agg(sum($"amount").alias("total_sales"))
In this example:
window("timestamp", "6 hours", slideDuration="1 hour")groups orders into 6-hour intervals that advance by 1 hour, for example, 5 to 11 AM and 6 AM to 12 PM.withWatermark("timestamp", "1 hour")keeps each window's aggregate in state until the window end timestamp is 1 hour older than the maximum order timestamp.slideDurationmust be less than or equal to thewindowDuration.
Use session windows to check user activity
Session windows have no fixed size. A window opens when a row arrives and closes after a gap duration that contains no new rows. Use session windows to aggregate activity bursts between long idle periods, such as a user's page views within a 30-minute period:
Python
from pyspark.sql.functions import session_window, sum
sessionized_page_views = (activity
.withWatermark("timestamp", "1 hour")
.groupBy("user_id", session_window("timestamp", gapDuration="30 minutes"))
.agg(sum("page_views").alias("total_page_views"))
)
Scala
import org.apache.spark.sql.functions.{session_window, sum}
val sessionizedPageViews = activity
.withWatermark("timestamp", "1 hour")
.groupBy($"user_id", session_window($"timestamp", "30 minutes"))
.agg(sum($"page_views").alias("total_page_views"))
In this example:
session_window("timestamp", gapDuration="30 minutes")opens a window when the first page view arrives. Each subsequent page view that arrives within 30 minutes extends the window. When no page view arrives within 30 minutes, the window closes and the next page view starts a new window.withWatermark("timestamp", "1 hour")keeps each session's aggregate in state until the window end timestamp is 1 hour older than the maximum page view timestamp.- The
timeColumnargument forwindow()andsession_window()must be ofTimestampTypeorTimestampNTZType. - Use
current_timestamp()to define windows based on processing time rather than event time. - You can set window durations from microseconds up to days. Month durations and longer are not supported.
- Use
completeoutput mode with windowed aggregations to keep all window state indefinitely. Useappendoutput mode with an appropriate watermark to bound state growth and prevent memory issues for large data sets. For more detail on output mode behavior, see Watermarks and output mode for windowed aggregations.