若要有效管理保持在狀態的資料,請在 Lakeflow Spark 宣告式管線中執行具狀態資料流程處理時使用浮水印,包括彙總、聯結和重複資料刪除。 本文說明如何在管線查詢中使用浮水印,並包含建議作業的範例。
備註
若要確保執行彙總的查詢會以累加方式處理,而不是在每次更新時完全重新計算,您必須使用浮水印。
什麼是浮水印?
在串流處理中, 浮水印 是 Apache Spark 的一項功能,可在執行有狀態作業 (例如彙總) 時定義以時間為基礎的臨界值來處理資料。 到達的資料會處理,直到達到臨界值為止,此時臨界值所定義的時間範圍會關閉。 浮水印可以用來避免查詢處理過程中出現問題,主要是在處理較大的資料集或長時間運行的處理時。 這些問題可能包括在產生結果時的高延遲,甚至因為在處理期間維持狀態的資料量過大而導致記憶體不足 (OOM) 錯誤。 由於串流資料本質上是無序的,因此浮水印也支援正確計算作業,例如時間範圍彙總。
若要深入瞭解如何在串流處理中使用浮水印,請參閱 Apache Spark 結構化串流中的浮水印 和 套用浮水印來控制資料處理臨界值。
如何定義浮水印?
您可以透過指定時間戳字段和表示延遲資料到達時間界限的值來定義水印。 如果資料在定義的時間臨界值之後到達,則會被視為延遲。 例如,如果臨界值定義為 10 分鐘,則在超過 10 分鐘臨界值之後到達的記錄可能會被捨棄。
由於在定義臨界值之後到達的記錄可能會被捨棄,因此選取符合延遲與正確性需求的臨界值非常重要。 選擇較小的臨界值會導致更快發出記錄,但也表示延遲的記錄更有可能被捨棄。 較大的臨界值意味著等待時間更長,但資料的完整性可能更高。 由於狀態大小較大,因此較大的臨界值可能也需要額外的運算資源。 由於臨界值取決於您的資料和處理需求,因此測試和監視您的處理對於判斷最佳臨界值非常重要。
您可以使用 Python 中的函數 withWatermark() 來定義浮水印。 在SQL中,使用 WATERMARK 子句來定義浮水印:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
將浮水印與串流聯結搭配使用
對於流式流結合,您必須為結合的兩端定義浮水印和時間間隔條款。 因為每個連接來源對資料的檢視都是不完整的,因此需要時間間隔子句來告訴串流引擎何時無法再進行匹配。 時間間隔子句必須使用用來定義浮水印的相同欄位。
因為有時每個串流可能需要不同的浮水印臨界值,所以串流不需要具有相同的臨界值。 為了避免遺失資料,串流引擎會根據最慢的串流維護一個全域浮水印。
下列範例會加入廣告曝光串流和使用者點按廣告串流。 在此範例中,點擊必須在曝光後 3 分鐘內發生。 經過 3 分鐘的時間間隔之後,會捨棄無法再比對的狀態中的資料列。
Python
from pyspark import pipelines as dp
dp.create_streaming_table("adImpressionClicks")
@dp.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
執行具有浮水印的視窗彙總
串流資料的常見具狀態作業是視窗匯總。 視窗彙總與群組彙總類似,不同之處在於會針對屬於已定義視窗一部分的資料列集傳回彙總值。
視窗可以定義為特定長度,並且可以在屬於該視窗的所有列上執行聚合操作。 Spark 串流支援三種類型的視窗:
- 翻滾 (固定) 視窗:一系列固定大小、不重疊且連續的時間間隔。 輸入記錄只屬於單一視窗。
- 滑動視窗:與翻滾視窗類似,滑動視窗是固定大小的,但視窗可以重疊,記錄可以落入多個視窗。
當資料到達超過視窗結尾加上浮水印長度時,視窗將不再接受任何新資料,並會觸發彙總結果的產生,之後視窗的狀態將被捨棄。
下列範例使用固定的窗口,每隔 5 分鐘計算曝光總和。 在此範例中,select 子句會使用別名 impressions_window,然後視窗本身會定義為子 GROUP BY 句的一部分。 此視窗的基礎必須是與浮水印相同的時間戳記欄位,即此範例中的 clickTimestamp 欄位。
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, window(clickTimestamp, "5 minutes") as impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
用於計算每小時固定時段利潤的 Python 類似範例:
from pyspark import pipelines as dp
@dp.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
刪除重複的串流記錄
結構化串流具有精確一次的處理保證,但不會自動從資料來源刪除重複的記錄。 例如,因為許多訊息佇列至少具有一次保證,所以從其中一個訊息佇列讀取時,應該預期會有重複的記錄。 您可以使用此功能 dropDuplicatesWithinWatermark() 來刪除任何指定欄位上的重複記錄,即使某些欄位不同(例如事件時間或到達時間),也可以從串流中移除重複項。 您必須指定浮水印才能使用該 dropDuplicatesWithinWatermark() 函數。 在浮水印指定的時間範圍內到達的所有重複資料都會被刪除。
排序資料很重要,因為無序資料會導致浮水印值錯誤地跳到前面。 然後,當較舊的資料到達時,會被視為延遲並捨棄。 使用該 withEventTimeOrder 選項,根據浮水印中指定的時間戳記,依序處理初始快照。 您可以在定義資料集的程式碼中宣告此withEventTimeOrder選項,或在spark.databricks.delta.withEventTimeOrder.enabled中使用 。 例如:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
備註
只有 Python 支援此 withEventTimeOrder 選項。
在下列範例中,資料會依clickTimestamp排序處理,並捨棄在5秒內到達且包含重複userId和clickAdId欄位的記錄。
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
最佳化有狀態處理的管線設定
為了協助防止生產問題和過多的延遲,Databricks 建議針對具狀態資料流程處理啟用 RocksDB 型狀態管理,特別是當您的處理需要儲存大量中繼狀態時。
Severless 管線會自動管理狀態存放區設定。
您可以在部署管線之前設定下列組態,以啟用 RocksDB 型狀態管理:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
若要深入瞭解 RocksDB 狀態存放區,包括 RocksDB 的設定建議,請參閱 在 Azure Databricks 上設定 RocksDB 狀態存放區。