共用方式為


使用 foreachBatch 寫入任意數據接收器

本文討論如何使用 foreachBatch 結構化串流,將串流查詢的輸出寫入至沒有現有串流接收的數據源。

程序代碼模式 streamingDF.writeStream.foreachBatch(...) 可讓您將批次函式套用至串流查詢每個微批次的輸出數據。 搭配 foreachBatch 使用的函式會採用兩個參數:

  • 具有微批次輸出數據的 DataFrame。
  • 微批次的唯一標識碼。

您必須在 foreachBatch 結構化串流中使用 Delta Lake 合併作業。 請參閱從串流查詢使用foreachBatch的Upsert

套用其他 DataFrame 作業

串流數據框架不支援許多 DataFrame 和數據集作業,因為 Spark 不支援在這些情況下產生累加計劃。 您可以使用 foreachBatch() ,在每個微批次輸出上套用其中一些作業。 例如,您可以使用 foreachBatch() 和 SQL MERGE INTO 作業,在更新模式中將串流匯總的輸出寫入 Delta 資料表。 如需詳細資訊,請參閱 MERGE INTO

重要

  • foreachBatch() 只提供至少一次寫入保證。 不過,您可以使用提供給函數的 batchId 作為去重輸出的方式,並獲得一次且僅一次的保證。 不論是哪一種情況,您都必須自行考慮端對端語意。
  • foreachBatch() 無法使用 連續處理模式 ,因為它基本上依賴串流查詢的微批次執行。 如果您以連續模式寫入資料,請改用 foreach()
  • 使用 foreachBatch 搭配具狀態的運算符時,務必在完成處理前,先完全消化每個批次。 請參閱 完整處理每個批次的 DataFrame

您可以使用 叫用 foreachBatch() 空的數據框架,而且使用者程式代碼必須具有復原性,才能進行適當的作業。 以下顯示一個範例:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0 中的行為變更foreachBatch

在 Databricks 執行環境 14.0 及以上版本中,於設定為標準存取模式的計算上,會套用下列行為變更:

  • print() 命令會將輸出寫入驅動程式記錄。
  • 您無法存取函式內的 dbutils.widgets 子模組。
  • 函式中參考的任何檔案、模組或對象都必須可串行化,且可在 Spark 上使用。

重複使用現有的批次數據源

您可以使用 foreachBatch(),針對可能沒有結構化串流支持的數據接收器,使用現有的批次數據寫入器。 以下是一些範例:

您可以從 使用 foreachBatch()許多其他批次數據來源。 請參閱 連線至數據源和外部服務

寫入多個位置

如果您需要將串流查詢的輸出寫入多個位置,Databricks 建議使用多個結構化串流寫入器來獲得最佳平行處理和輸送量。

使用 foreachBatch 來寫入多個接收會串行化串流寫入的執行,這可能會增加每個微批次的延遲。

如果您使用 foreachBatch 來寫入多個 Delta 數據表,請參閱 中的foreachBatch等冪數據表寫入

完全處理每個批次 DataFrame

當您使用具狀態運算符時(例如,使用 dropDuplicatesWithinWatermark),每個批次反覆專案都必須取用整個 DataFrame 或重新啟動查詢。 如果您未取用整個 DataFrame,串流查詢將會因為下一個批次而失敗。

這可能會在數種情況下發生。 下列範例示範如何修正未正確取用 DataFrame 的查詢。

刻意使用數據批次的子集

如果您只關心批次的子集,則可以有如下的程序代碼。

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

在此情況下, batch_df.show(2) 只會處理批次中的前兩個專案,這是預期的,但如果有更多專案,則必須取用這些專案。 下列程式代碼會取用完整的 DataFrame。

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

在此,函 do_nothing 式會以無訊息方式忽略 DataFrame 的其餘部分。

處理批次中的錯誤

執行 foreachBatch 進程時可能會發生錯誤。 您可以有如下的程式代碼(在此情況下,範例會刻意引發錯誤以顯示問題)。

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

在處理(並靜默吞掉)錯誤時,批次的其餘部分可能無法被處理。 有兩個選項可用來處理這種情況。

首先,您可以重新引發錯誤,將錯誤傳遞至調度層以重新嘗試批次處理。 如果這是暫時問題,可以解決此錯誤;如果不是,則將問題通知您的作業小組,以嘗試手動解決問題。 若要這樣做,請將程式 partial_func 代碼變更為如下所示:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

如果您想要攔截例外狀況並忽略批次的其餘部分,第二個選項是將程式代碼變更為這個。

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

此程式代碼會使用函 do_nothing 式以無訊息方式忽略批次的其餘部分。