分享方式:


使用浮浮浮水印優化 Delta 即時數據表中的具狀態處理

若要有效管理保持狀態的數據,請在 Delta Live Tables 中執行具狀態數據流處理時使用浮浮浮水印,包括匯總、聯結和重複數據刪除。 本文說明如何在 Delta Live Tables 查詢中使用浮浮水印,並包含建議作業的範例。

注意

若要確保執行匯總的查詢會以累加方式處理,且未完全重新計算每個更新,您必須使用浮浮浮水印。

什麼是浮浮浮浮水印?

在串流處理中,浮水印是 Apache Spark 功能,可在執行匯總等具狀態作業時定義處理數據的以時間為基礎的閾值。 抵達的數據會經過處理,直到達到臨界值為止,此時臨界值所定義的時間範圍就會關閉。 浮浮水印可用來避免查詢處理期間發生問題,主要是在處理較大的數據集或長時間執行的處理時。 這些問題可能包括產生結果的高延遲,甚至記憶體不足 (OOM) 錯誤,因為處理期間保持狀態的數據量。 由於串流數據原本就未排序,浮水印也支援正確計算時間範圍匯總等作業。

若要深入瞭解如何在串流處理中使用浮浮水印,請參閱 Apache Spark 結構化串流 中的浮浮水印和 套用水印來控制數據處理閾值

如何定義浮浮水印?

您可以藉由指定時間戳字段和值來定義浮水印,代表延遲數據到達的時間閾值。 如果數據在定義的時間臨界值之後到達,則會將其視為遲到。 例如,如果閾值定義為 10 分鐘,可能會卸除 10 分鐘閾值之後抵達的記錄。

由於在定義的臨界值之後抵達的記錄可能會遭到捨棄,因此選取符合延遲與正確性需求的閾值非常重要。 選擇較小的臨界值會導致更快發出記錄,但也表示較晚的記錄更有可能遭到捨棄。 較大的臨界值表示較長的等候時間,但可能更完整的數據。 由於狀態大小較大,因此較大的臨界值可能也需要額外的運算資源。 因為臨界值取決於您的數據和處理需求,因此測試和監視您的處理對於判斷最佳閾值很重要。

您可以使用 Python 中的 函 withWatermark() 式來定義浮水印。 在 SQL 中 WATERMARK ,使用 子句來定義浮水印:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

搭配串流聯結使用浮浮浮水印

針對數據流聯結,您必須在聯結的兩側定義浮水印和時間間隔子句。 因為每個聯結來源都有數據不完整的檢視,因此需要時間間隔子句,才能告訴串流引擎何時無法進行進一步的相符專案。 時間間隔子句必須使用用來定義浮水印的相同欄位。

因為有時候每個數據流都需要不同的浮浮列印臨界值,因此數據流不需要有相同的臨界值。 為了避免遺漏數據,串流引擎會根據最慢的數據流維護一個全域水位線。

下列範例會聯結廣告曝光串流,以及使用者點擊廣告的串流。 在此範例中,點擊必須在曝光后的 3 分鐘內發生。 經過 3 分鐘的時間間隔之後,卸除無法再比對之狀態的數據列。

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

使用浮浮浮水印執行視窗式匯總

串流數據的常見具狀態作業是視窗式匯總。 視窗匯總類似於群組匯總,不同之處在於會針對屬於定義視窗一部分的數據列集傳回匯總值。

視窗可以定義為特定長度,而且匯總作業可以在屬於該視窗的所有數據列上執行。 Spark 串流支援三種類型的視窗:

  • 輪轉 (固定) 視窗:一系列的固定大小、非重疊和連續時間間隔。 輸入記錄只屬於單一視窗。
  • 滑動視窗:類似於輪轉視窗,滑動視窗是固定大小的,但視窗可以重疊,而記錄可以落入多個視窗。

當數據到達視窗結尾加上浮水印的長度時,不會接受視窗的新數據、發出匯總的結果,以及卸除窗口的狀態。

下列範例會使用固定視窗每 5 分鐘計算一次曝光數的總和。 在此範例中,select 子句會使用別名 impressions_window,然後視窗本身會定義為 子句的 GROUP BY 一部分。 窗口必須以與浮浮浮水印相同的時間戳數據行為基礎, clickTimestamp 本範例中的數據行。

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Python 中的類似範例,用來計算每小時固定時段的利潤:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    dlt.read_stream("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

重複資料刪除串流記錄

結構化串流具有完全相同的一次處理保證,但不會自動從數據源取消重複的記錄。 例如,由於許多消息佇列至少有一次保證,因此從其中一個消息佇列讀取時,應該預期重複的記錄。 您可以使用 函 dropDuplicatesWithinWatermark() 式來取消任何指定字段上的重複記錄,即使某些欄位不同(例如事件時間或抵達時間)也會從數據流中移除重複專案。 您必須指定浮水印才能使用 函 dropDuplicatesWithinWatermark() 式。 所有抵達浮浮浮浮水印所指定時間範圍內的重複數據都會卸除。

已排序的數據很重要,因為順序錯亂的數據會導致浮水印值不正確地向前跳躍。 然後,當較舊的數據到達時,會被視為遲到且已卸除。 withEventTimeOrder使用 選項,根據浮浮浮水印中指定的時間戳,依序處理初始快照集。 選項withEventTimeOrder可以在定義數據集的程式代碼中,或使用 在管線設定spark.databricks.delta.withEventTimeOrder.enabled宣告。 例如:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

注意

此選項 withEventTimeOrder 僅支援 Python。

在下列範例中,會依 clickTimestamp排序數據,並在包含重複 userIdclickAdId 數據行的 5 秒內抵達的記錄會卸除。

clicksDedupDf = (
  spark.readStream
    .option("withEventTimeOrder", "true")
    .table(rawClicks)
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

優化管線組態以進行具狀態處理

為了協助防止生產問題和過度延遲,Databricks 建議為您的具狀態串流處理啟用 RocksDB 型狀態管理,特別是如果您的處理需要節省大量的中繼狀態。

無伺服器管線會自動管理狀態存放區設定。

您可以在部署管線之前設定下列設定,以啟用 RocksDB 型狀態管理:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

若要深入瞭解 RocksDB 狀態存放區,包括 RocksDB 的設定建議,請參閱在 Azure Databricks 上設定 RocksDB 狀態存放區