使用 foreachBatch 寫入任意數據接收器
本文討論如何使用 foreachBatch
結構化串流,將串流查詢的輸出寫入至沒有現有串流接收的數據源。
程序代碼模式 streamingDF.writeStream.foreachBatch(...)
可讓您將批次函式套用至串流查詢每個微批次的輸出數據。 搭配 使用的函式採用 foreachBatch
兩個參數:
- 具有微批次輸出數據的 DataFrame。
- 微批次的唯一標識碼。
您必須在 foreachBatch
結構化串流中使用 Delta Lake 合併作業。 請參閱 使用 foreachBatch 從串流查詢的 Upsert。
套用其他 DataFrame 作業
串流數據框架不支援許多 DataFrame 和數據集作業,因為 Spark 不支援在這些情況下產生累加計劃。 您可以使用 foreachBatch()
,在每個微批次輸出上套用其中一些作業。 例如,您可以使用 foreachBath()
和 SQL MERGE INTO
作業,將串流匯總的輸出寫入更新模式中的 Delta 資料表。 如需詳細資訊, 請參閱 MERGE INTO。
重要
foreachBatch()
只提供至少一次寫入保證。 不過,您可以使用batchId
提供給 函式的 作為重複資料刪除輸出的方式,並取得一次完全保證。 不論是哪一種情況,您都必須自行考慮端對端語意。foreachBatch()
無法使用 連續處理模式 ,因為它基本上依賴串流查詢的微批次執行。 如果您以連續模式寫入資料,請改用foreach()
。
您可以使用 叫用 foreachBatch()
空的數據框架,而且使用者程式代碼必須具有復原性,才能進行適當的作業。 以下顯示一個範例:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Databricks Runtime 14.0 中的行為變更foreachBatch
在以共用存取模式設定的計算上,在 Databricks Runtime 14.0 和更新版本中, forEachBatch
於 Apache Spark 上以個別隔離的 Python 進程執行,而不是在 REPL 環境中執行。 它會串行化並推送至Spark,而且在會話期間無法存取全域 spark
物件。
在所有其他計算組態中, foreachBatch
會在執行其餘程序代碼的相同 Python REPL 中執行。 因此,函式不會串行化。
當您在以共用存取模式設定的計算上使用 Databricks Runtime 14.0 和更新版本時,您必須在 sparkSession
Python 中使用 foreachBatch
範圍設定為本機 DataFrame 的變數,如下列程式代碼範例所示:
def example_function(df, batch_id):
df.sparkSession.sql("<query>")
套用下列行為變更:
- 您無法從函式記憶體取任何全域 Python 變數。
print()
命令會將輸出寫入驅動程序記錄。- 函式中參考的任何檔案、模組或對象都必須可串行化,且可在Spark上使用。
重複使用現有的批次數據源
您可以使用 foreachBatch()
,針對可能沒有結構化串流支持的數據接收器,使用現有的批次數據寫入器。 以下是一些範例:
您可以從 使用 foreachBatch()
許多其他批次數據來源。 請參閱數據源 連線。
寫入多個位置
如果您需要將串流查詢的輸出寫入多個位置,Databricks 建議使用多個結構化串流寫入器來獲得最佳平行處理和輸送量。
使用 foreachBatch
來寫入多個接收會串行化串流寫入的執行,這可能會增加每個微批次的延遲。
如果您使用 foreachBatch
寫入多個 Delta 數據表,請參閱 foreachBatch 中的等冪數據表寫入。
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應