共用方式為


使用 foreachBatch 寫入任意數據接收器

本文討論如何使用 foreachBatch 結構化串流,將串流查詢的輸出寫入至沒有現有串流接收的數據源。

程序代碼模式 streamingDF.writeStream.foreachBatch(...) 可讓您將批次函式套用至串流查詢每個微批次的輸出數據。 搭配 使用的函式採用 foreachBatch 兩個參數:

  • 具有微批次輸出數據的 DataFrame。
  • 微批次的唯一標識碼。

您必須在 foreachBatch 結構化串流中使用 Delta Lake 合併作業。 請參閱 使用 foreachBatch 從串流查詢的 Upsert。

套用其他 DataFrame 作業

串流數據框架不支援許多 DataFrame 和數據集作業,因為 Spark 不支援在這些情況下產生累加計劃。 您可以使用 foreachBatch() ,在每個微批次輸出上套用其中一些作業。 例如,您可以使用 foreachBath() 和 SQL MERGE INTO 作業,將串流匯總的輸出寫入更新模式中的 Delta 資料表。 如需詳細資訊, 請參閱 MERGE INTO

重要

  • foreachBatch() 只提供至少一次寫入保證。 不過,您可以使用 batchId 提供給 函式的 作為重複資料刪除輸出的方式,並取得一次完全保證。 不論是哪一種情況,您都必須自行考慮端對端語意。
  • foreachBatch() 無法使用 連續處理模式 ,因為它基本上依賴串流查詢的微批次執行。 如果您以連續模式寫入資料,請改用 foreach()

您可以使用 叫用 foreachBatch() 空的數據框架,而且使用者程式代碼必須具有復原性,才能進行適當的作業。 以下顯示一個範例:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Databricks Runtime 14.0 中的行為變更foreachBatch

在以共用存取模式設定的計算上,在 Databricks Runtime 14.0 和更新版本中, forEachBatch 於 Apache Spark 上以個別隔離的 Python 進程執行,而不是在 REPL 環境中執行。 它會串行化並推送至Spark,而且在會話期間無法存取全域 spark 物件。

在所有其他計算組態中, foreachBatch 會在執行其餘程序代碼的相同 Python REPL 中執行。 因此,函式不會串行化。

當您在以共用存取模式設定的計算上使用 Databricks Runtime 14.0 和更新版本時,您必須在 sparkSession Python 中使用 foreachBatch 範圍設定為本機 DataFrame 的變數,如下列程式代碼範例所示:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

套用下列行為變更:

  • 您無法從函式記憶體取任何全域 Python 變數。
  • print() 命令會將輸出寫入驅動程序記錄。
  • 函式中參考的任何檔案、模組或對象都必須可串行化,且可在Spark上使用。

重複使用現有的批次數據源

您可以使用 foreachBatch(),針對可能沒有結構化串流支持的數據接收器,使用現有的批次數據寫入器。 以下是一些範例:

您可以從 使用 foreachBatch()許多其他批次數據來源。 請參閱數據源 連線。

寫入多個位置

如果您需要將串流查詢的輸出寫入多個位置,Databricks 建議使用多個結構化串流寫入器來獲得最佳平行處理和輸送量。

使用 foreachBatch 來寫入多個接收會串行化串流寫入的執行,這可能會增加每個微批次的延遲。

如果您使用 foreachBatch 寫入多個 Delta 數據表,請參閱 foreachBatch 中的等冪數據表寫入。