本頁展示了如何利用 foreachBatch 結構化串流,將串流查詢的輸出寫入沒有現有串流匯入的資料來源。
程序代碼模式 streamingDF.writeStream.foreachBatch(...) 可讓您將批次函式套用至串流查詢每個微批次的輸出數據。 搭配 foreachBatch 使用的函式會採用兩個參數:
- 包含微批次輸出資料的 DataFrame。
- 微批次的唯一標識碼。
在進行 Delta Lake 合併作業時,您必須在結構化串流中使用 foreachBatch。 請參閱從串流查詢使用foreachBatch的Upsert。
套用更多的 DataFrame 操作
許多 DataFrame 和數據集作業不支援於串流數據框架中,因為在這些情況下 Spark 不支援產生增量計劃。 您可以使用 foreachBatch() ,在每個微批次輸出上套用其中一些作業。 例如,您可以使用 foreachBatch() 和 SQL MERGE INTO 作業,在更新模式中將串流匯總的輸出寫入 Delta 資料表。 如需詳細資訊,請參閱 MERGE INTO。
重要
-
foreachBatch()僅提供至少一次的寫入保證。 不過,您可以使用提供給函數的batchId作為去重輸出的方式,並獲得一次且僅一次的保證。 不論是哪一種情況,您都必須自行考慮端對端語意。 -
foreachBatch()無法使用 連續處理模式 ,因為它基本上依賴串流查詢的微批次執行。 如果您以連續模式寫入資料,請改用foreach()。 - 使用
foreachBatch搭配具狀態的運算符時,務必在完成處理前,先完全消化每個批次。 請參閱 完整處理每個批次的 DataFrame
處理空資料框架
foreachBatch() 可能會收到空的 DataFrame,而你的程式碼必須處理這種情況。 否則,你的查詢可能會失敗。
例如,當 Delta Lake 是串流來源時,這些情境可能會將空資料幀傳遞給 foreachBatch():
-
OPTIMIZE沒有檔案可處理:當OPTIMIZE操作在 Delta Lake 來源資料表上執行但沒有可處理的檔案時,結構化串流會寫入偏移量日誌條目以遞增資料表版本。 這會在匯入系統產生一個空的微批次,儘管沒有讀取任何檔案。 - 在實體計畫層級進行檔案剪枝:如果謂詞推下或檔案修剪消除了所有實體計畫層級的所有紀錄,結果是對匯的提交為空。
使用者程式碼必須處理空資料幀,才能正常運作。 請參閱下列範例:
Python
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass
streamingDF.writeStream.foreachBatch(process_batch).start()
Scala
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid DataFrames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 中的行為變更foreachBatch
在 Databricks 執行環境 14.0 及以上版本中,於設定為標準存取模式的計算上,會套用下列行為變更:
-
print()命令會將輸出寫入驅動程式記錄。 - 您無法存取函式內的
dbutils.widgets子模組。 - 函式中參考的任何檔案、模組或對象都必須可串行化,且可在 Spark 上使用。
重複使用現有的批次數據源
您可以使用 foreachBatch(),針對可能沒有結構化串流支持的數據接收器,使用現有的批次數據寫入器。 以下是一些範例:
您可以從foreachBatch()使用許多其他的批次數據來源。 請參閱 連線至數據源和外部服務。
寫入多個目的地位置
如果您需要將串流查詢的輸出寫入多個位置,Databricks 建議使用多個結構化串流寫入器來獲得最佳平行處理和輸送量。
使用 foreachBatch 將資料寫入多個接收者會串行化執行串流寫入作業,這可能會增加每個微批次處理的延遲。
如果你確實需要使用foreachBatch寫入多個 Delta 表,請參見「冪等性表寫入的使用foreachBatch」。
完全處理每個批次 DataFrame
當您使用具狀態運算符時(例如,使用 dropDuplicatesWithinWatermark),每個批次反覆專案都必須取用整個 DataFrame 或重新啟動查詢。 如果您未取用整個 DataFrame,串流查詢將會因為下一個批次而失敗。
這可能會在數種情況下發生。 下列範例示範如何修正未正確取用 DataFrame 的查詢。
刻意使用數據批次的子集
如果您只關心批次的子集,則可以有如下的程序代碼。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
在此情況下, batch_df.show(2) 只會處理批次中的前兩個專案,這是預期的,但如果有更多專案,則必須取用這些專案。 下列程式代碼會取用完整的 DataFrame。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
在此,函 do_nothing 式會以無訊息方式忽略 DataFrame 的其餘部分。
處理批次中的錯誤
在foreachBatch中進行錯誤處理時,Databricks 建議允許串流查詢快速失敗,並依賴編排層(例如 Lakeflow Jobs 或 Apache Airflow)來管理重試邏輯。 這比起在程式碼中建立複雜的重試迴圈安全得多,因為那樣可能會發生資料遺失。
以下是根據你的寫作目標制定的指引:
| 目標 | Examples | 指導 |
|---|---|---|
| DataFrame 操作 | Delta Lake 資料表 | 你必須使用 txnAppId 和 txnVersion 寫入選項, 將 txnVersion 綁定到 batchId,以保證冪等性並保護重試時的資料正確性。 不要在本地抓取並重試例外。 Databricks 建議允許錯誤傳播,這樣 Spark 指標才能保持準確,資料不會重複,且編排器能乾淨地重試整個批次。 |
| 自訂程式碼與外部目的地 |
.collect()、OLTP 資料庫、訊息佇列、API |
實施你自己的冪性理論。 你必須假設任何操作都可以且會在批次間重複嘗試。 如果保持 batchId 不變,手術結果也必須保持不變。 你可以重試純暫時錯誤,例如短暫連線逾時,但若重試失敗,務必避免部分或重複寫入。 最安全的做法是讓錯誤自行傳播,並讓編排器重新嘗試整個批次。 |
以下是一些例外類型範例及處理建議:foreachBatch
| 例外狀況類型 | Examples | 建議的動作 |
|---|---|---|
| 瞬態匯誤差 |
SQLTransientConnectionException、HTTP 429、逾時 |
捕捉:重試,或者傳送到死信佇列 |
| 當接收端是冪等的時候,會發生重複或金鑰約束違規 | SQLIntegrityConstraintViolationException |
捕捉:記錄並抑制 |
| 自訂可重試錯誤 | 封裝套接字例外、可重試資料庫錯誤 | 缺點:增量指標並允許可控的延續 |
| 邏輯或結構錯誤 |
NullPointerException、 AttributeError結構不匹配 |
傳播:讓 Spark 查詢失敗 |
| 無法重試的匯入錯誤或未被發現的邏輯錯誤 |
ValueError、PermissionError |
傳播:讓 Spark 查詢失敗 |
| 關鍵故障 |
OutOfMemoryError、損壞狀態、資料完整性違規 |
傳播:讓 Spark 查詢失敗 |
程式碼範例:例外處理
以下範例故意提出錯誤, foreach 以展示處理錯誤的不同方法:
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
上述程式碼處理並靜默抑制錯誤,且可能不會消耗批次的其餘部分。 有兩個選項可用來處理這種情況。
首先,你可以重新提出錯誤,然後把錯誤傳給你的協調層,重新嘗試批次。 如果是暫時性問題,這可以解決錯誤;否則,將問題提交給你的營運團隊以嘗試手動修正。 若要這樣做,請將程式 partial_func 代碼變更為如下所示:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
第二,如果你想攔截例外並忽略批次的其他部分,可以修改程式碼,使用 do_nothing 函式靜靜忽略批次的其他部分。
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()