共用方式為


套用浮水印來控制資料處理閾值

本頁說明浮水印的基本概念,並提供在常見有狀態串流操作中使用浮水印的建議。 你必須對有狀態的串流操作套用浮水印,以避免無限擴充狀態中保留的資料量,這可能會引發記憶體問題或在長時間串流操作中增加處理延遲。

什麼是浮水印?

結構化串流使用水印來控制持續處理指定狀態實體更新的時間臨界值。 狀態元件的常見範例包括:

  • 時間範圍中的匯總。
  • 兩個數據流之間連接的唯一鍵值。

當您宣告浮水印時,您會在串流數據框架上指定時間戳欄位和浮水印閾值。 當新數據送達時,狀態管理員會追蹤指定欄位中最新的時間戳,並在延遲閾值內處理所有記錄。

下列範例會將 10 分鐘的水位線閾值套用至視窗計數:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

在此範例中:

  • event_time 欄可用來定義 10 分鐘水印和 5 分鐘的滾動窗口。
  • 針對每個非重疊的 5 分鐘視窗,會收集每個 id 觀察到的計數。
  • 狀態資訊會針對每個計數進行維護,直到時間範圍結束時,比觀察到的最新 event_time還要晚 10 分鐘。

重要

水印閾值保證會根據所定義查詢的語意來處理在指定閾值內抵達的記錄。 超出指定門檻的遲到紀錄仍可能使用查詢指標處理,但這並非必然。

浮水印如何影響處理時間與處理效率?

浮水印會與輸出模式互動,以控制資料寫入接收端的時機。 由於浮水印會減少要處理的狀態資訊總量,因此有效使用浮水印對於有效率的具狀態串流輸送量至關重要。

注意

並非所有的輸出模式都支援所有具狀態作業。

視窗聚合的浮水印與輸出模式

下表詳細說明具已定義水印的時間戳聚合查詢處理:

輸出模式 行為
追加 當浮水印門檻超過後,列會寫入目標表格。 所有寫入作業都會根據遲延閾值來進行延期。 舊的聚合狀態在閾值過後會被丟棄。
更新 數據列會在計算結果時寫入目標數據表,而且可在新數據送達時更新和覆寫。 舊的聚合狀態在閾值過後會被丟棄。
完成 聚合狀態不會被刪除。 目標數據表會在每次觸發程式時被重寫。

流-溪流連接的浮水印與輸出

多個數據流之間的聯結僅支援附加模式,且相符的記錄會在探索到的每個批次中寫入。 針對內連接,Databricks 建議在每個串流數據源上設定水位閾值。 這可讓捨棄舊記錄的狀態資訊變得可能。 如果沒有水印,結構化串流會在每次觸發時嘗試聯結每個鍵值來自兩端的聯結。

結構化串流具有特殊的語意,可支援外部聯結。 外部聯結必須使用浮水印,因為這表示在某個鍵未匹配後,何時必須以 Null 值寫入。 雖然外部聯結在記錄資料處理中從未匹配的紀錄上很有用,但因為聯結只會以附加操作寫入資料表,這些缺失的資料要等到超過延遲閾值後才會被記錄。

在結構化串流中使用多個浮水印原則控制延遲數據閾值

使用多個結構化串流輸入時,您可以設定多個浮水印來控制延遲抵達數據的容錯閾值。 設定水印可讓您控制狀態資訊並影響延遲。

串流查詢可以有多個聯集或聯結在一起的輸入數據流。 每個輸入數據流可以有不同的延遲數據臨界值,這些必須被容許以進行有狀態的操作。 使用 withWatermarks("eventTime", delay) 在每個輸入數據流上指定這些臨界值。 以下是具有 流-流聯結的範例查詢。

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

執行查詢時,結構化串流會個別追蹤每個輸入數據流中所看到的最大事件時間,根據對應的延遲計算水位標記,並選擇單一全域水位標記來用於具狀態的操作。 預設情況下,選擇最小值作為全域浮水印,因為這樣可以防止當其中一個串流落後於其他串流時,資料會因延遲而意外丟棄(例如,其中一個串流因上行故障而停止接收資料)。 換句話說,全域水印會以最慢的數據流速度安全地移動,並因此延遲查詢結果的輸出。

如果您想要取得更快的結果,您可以設定多重浮水印策略,藉由將 SQL 設定從 spark.sql.streaming.multipleWatermarkPolicy 調整為 max 來選擇最大值作為全局浮水印(預設值為 min)。 這可讓全球水位線以最快的數據流速度移動。 不過,此設定會從最慢的數據流卸除數據。 Databricks 建議謹慎使用此配置。

在不同操作上套用浮水印

distinct 運算是一個有狀態運算子,需要浮水印以防止無界狀態成長。 沒有浮水印,結構化串流會嘗試無限追蹤每筆獨特紀錄,這可能導致記憶體問題或處理延遲增加。

當你套用 distinct 串流資料框時,必須在時間戳記欄位上指定浮水印。 浮水印決定州經理保留重複資料刪除紀錄的時間長短。 當浮水印門檻過後,舊紀錄會從州內移除。

以下範例將浮水印應用於操作 distinct

Python

streamingDf = spark.readStream. ...  # columns: eventTime, id, value, ...

# Apply watermark before distinct operation
(streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()
)

Scala (程式語言)

val streamingDf = spark.readStream. ...  // columns: eventTime, id, value, ...

// Apply watermark before distinct operation
streamingDf
  .withWatermark("eventTime", "1 hour")
  .distinct()

在此範例中,在最新觀測到的 eventTime 記錄出現的1小時內抵達的重複紀錄會從數據流中移除。 當閾值超過後,用於去重的狀態資訊會被丟棄。

重要

如果你需要在特定欄位進行去重,而不是所有欄位,請使用 dropDuplicates()dropDuplicatesWithinWatermark() 代替 distinct。 如需詳細資訊,請參閱下一節。

去除浮水印之內的重複項目

在 Databricks Runtime 13.3 LTS 或更新版本中,你可以使用唯一識別碼在浮水印閾值內進行重複紀錄的去重。

結構化串流提供精確一次處理的保證,但不會自動從資料來源中刪除記錄。 您可以使用 dropDuplicatesWithinWatermark 在任何指定的欄位上進行去重,從而讓您即使某些欄位不同(例如事件時間或抵達時間)也能從資料流中移除重複的記錄。

保證會在指定水印內抵達的重複記錄被移除。 此保證僅在一個方向上具有嚴格限制,並且超過指定閾值的重複記錄也可能會被捨棄。 您必須將浮水印的延遲臨界值設定得比重複事件之間的時間戳記的最大差異長,以移除所有重複事件。

您必須指定浮水印以使用 dropDuplicatesWithinWatermark 方法,如下列範例所示:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala (程式語言)

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(Seq("guid"))