使用 foreachBatch 寫入任意資料匯入端

本頁展示了如何利用 foreachBatch 結構化串流,將串流查詢的輸出寫入沒有現有串流匯入的資料來源。

程序代碼模式 streamingDF.writeStream.foreachBatch(...) 可讓您將批次函式套用至串流查詢每個微批次的輸出數據。 搭配 foreachBatch 使用的函式會採用兩個參數:

  • 包含微批次輸出資料的 DataFrame。
  • 微批次的唯一標識碼。

在進行 Delta Lake 合併作業時,您必須在結構化串流中使用 foreachBatch。 請參閱從串流查詢使用foreachBatch的Upsert

套用更多的 DataFrame 操作

許多 DataFrame 和數據集作業不支援於串流數據框架中,因為在這些情況下 Spark 不支援產生增量計劃。 您可以使用 foreachBatch() ,在每個微批次輸出上套用其中一些作業。 例如,您可以使用 foreachBatch() 和 SQL MERGE INTO 作業,在更新模式中將串流匯總的輸出寫入 Delta 資料表。 如需詳細資訊,請參閱 MERGE INTO

重要

  • foreachBatch() 僅提供至少一次的寫入保證。 不過,您可以使用提供給函數的 batchId 作為去重輸出的方式,並獲得一次且僅一次的保證。 不論是哪一種情況,您都必須自行考慮端對端語意。
  • foreachBatch() 無法使用 連續處理模式 ,因為它基本上依賴串流查詢的微批次執行。 如果您以連續模式寫入資料,請改用 foreach()
  • 使用 foreachBatch 搭配具狀態的運算符時,務必在完成處理前,先完全消化每個批次。 請參閱 完整處理每個批次的 DataFrame

處理空資料框架

foreachBatch() 可能會收到空的 DataFrame,而你的程式碼必須處理這種情況。 否則,你的查詢可能會失敗。

例如,當 Delta Lake 是串流來源時,這些情境可能會將空資料幀傳遞給 foreachBatch()

  • OPTIMIZE 沒有檔案可處理:當 OPTIMIZE 操作在 Delta Lake 來源資料表上執行但沒有可處理的檔案時,結構化串流會寫入偏移量日誌條目以遞增資料表版本。 這會在匯入系統產生一個空的微批次,儘管沒有讀取任何檔案。
  • 在實體計畫層級進行檔案剪枝:如果謂詞推下或檔案修剪消除了所有實體計畫層級的所有紀錄,結果是對匯的提交為空。

使用者程式碼必須處理空資料幀,才能正常運作。 請參閱下列範例:

Python

def process_batch(output_df, batch_id):
  # Process valid DataFrames only
  if not output_df.isEmpty():
    # business logic
    pass

streamingDF.writeStream.foreachBatch(process_batch).start()

Scala

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid DataFrames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0 中的行為變更foreachBatch

在 Databricks 執行環境 14.0 及以上版本中,於設定為標準存取模式的計算上,會套用下列行為變更:

  • print() 命令會將輸出寫入驅動程式記錄。
  • 您無法存取函式內的 dbutils.widgets 子模組。
  • 函式中參考的任何檔案、模組或對象都必須可串行化,且可在 Spark 上使用。

重複使用現有的批次數據源

您可以使用 foreachBatch(),針對可能沒有結構化串流支持的數據接收器,使用現有的批次數據寫入器。 以下是一些範例:

您可以從foreachBatch()使用許多其他的批次數據來源。 請參閱 連線至數據源和外部服務

寫入多個目的地位置

如果您需要將串流查詢的輸出寫入多個位置,Databricks 建議使用多個結構化串流寫入器來獲得最佳平行處理和輸送量。

使用 foreachBatch 將資料寫入多個接收者會串行化執行串流寫入作業,這可能會增加每個微批次處理的延遲。

如果你確實需要使用foreachBatch寫入多個 Delta 表,請參見「冪等性表寫入的使用foreachBatch」。

完全處理每個批次 DataFrame

當您使用具狀態運算符時(例如,使用 dropDuplicatesWithinWatermark),每個批次反覆專案都必須取用整個 DataFrame 或重新啟動查詢。 如果您未取用整個 DataFrame,串流查詢將會因為下一個批次而失敗。

這可能會在數種情況下發生。 下列範例示範如何修正未正確取用 DataFrame 的查詢。

刻意使用數據批次的子集

如果您只關心批次的子集,則可以有如下的程序代碼。

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

在此情況下, batch_df.show(2) 只會處理批次中的前兩個專案,這是預期的,但如果有更多專案,則必須取用這些專案。 下列程式代碼會取用完整的 DataFrame。

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row):
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

在此,函 do_nothing 式會以無訊息方式忽略 DataFrame 的其餘部分。

處理批次中的錯誤

foreachBatch中進行錯誤處理時,Databricks 建議允許串流查詢快速失敗,並依賴編排層(例如 Lakeflow Jobs 或 Apache Airflow)來管理重試邏輯。 這比起在程式碼中建立複雜的重試迴圈安全得多,因為那樣可能會發生資料遺失。

以下是根據你的寫作目標制定的指引:

目標 Examples 指導
DataFrame 操作 Delta Lake 資料表 你必須使用 txnAppIdtxnVersion 寫入選項, 將 txnVersion 綁定到 batchId,以保證冪等性並保護重試時的資料正確性。 不要在本地抓取並重試例外。 Databricks 建議允許錯誤傳播,這樣 Spark 指標才能保持準確,資料不會重複,且編排器能乾淨地重試整個批次。
自訂程式碼與外部目的地 .collect()、OLTP 資料庫、訊息佇列、API 實施你自己的冪性理論。 你必須假設任何操作都可以且會在批次間重複嘗試。 如果保持 batchId 不變,手術結果也必須保持不變。 你可以重試純暫時錯誤,例如短暫連線逾時,但若重試失敗,務必避免部分或重複寫入。 最安全的做法是讓錯誤自行傳播,並讓編排器重新嘗試整個批次。

以下是一些例外類型範例及處理建議:foreachBatch

例外狀況類型 Examples 建議的動作
瞬態匯誤差 SQLTransientConnectionException、HTTP 429、逾時 捕捉:重試,或者傳送到死信佇列
當接收端是冪等的時候,會發生重複或金鑰約束違規 SQLIntegrityConstraintViolationException 捕捉:記錄並抑制
自訂可重試錯誤 封裝套接字例外、可重試資料庫錯誤 缺點:增量指標並允許可控的延續
邏輯或結構錯誤 NullPointerExceptionAttributeError結構不匹配 傳播:讓 Spark 查詢失敗
無法重試的匯入錯誤或未被發現的邏輯錯誤 ValueErrorPermissionError 傳播:讓 Spark 查詢失敗
關鍵故障 OutOfMemoryError、損壞狀態、資料完整性違規 傳播:讓 Spark 查詢失敗

程式碼範例:例外處理

以下範例故意提出錯誤, foreach 以展示處理錯誤的不同方法:

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

上述程式碼處理並靜默抑制錯誤,且可能不會消耗批次的其餘部分。 有兩個選項可用來處理這種情況。

首先,你可以重新提出錯誤,然後把錯誤傳給你的協調層,重新嘗試批次。 如果是暫時性問題,這可以解決錯誤;否則,將問題提交給你的營運團隊以嘗試手動修正。 若要這樣做,請將程式 partial_func 代碼變更為如下所示:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

第二,如果你想攔截例外並忽略批次的其他部分,可以修改程式碼,使用 do_nothing 函式靜靜忽略批次的其他部分。

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row):
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()