共用方式為


使用 ForEachBatch 來寫入管線中的任意資料匯

這很重要

API foreach_batch_sink 處於 公開預覽狀態

ForEachBatch 接收器允許你將串流作為微批次來處理。 每個批次都可以在 Python 中處理,並使用類似 Apache Spark 結構化串流的 foreachBatch自訂邏輯。 透過 Lakeflow Spark 宣告式管線(SDP)ForEachBatch 匯入器,你可以將串流資料轉換、合併或寫入到一個或多個原生不支援串流寫入的目標。 本頁面將引導你如何配置 ForEachBatch 接收器,提供範例,並討論關鍵考量。

ForEachBatch sink 提供以下功能:

  • 每個微批次的自訂邏輯:ForEachBatch 是一個靈活的串流匯入器。 你可以用 Python 程式碼套用任意動作(例如合併到外部資料表、寫入多個目的地,或執行 upserts)。
  • 完整刷新支援:管線會依每個流程管理檢查點,因此當你完整刷新管線時,檢查點會自動重置。 使用 ForEachBatch sink,當發生這種情況時,你負責管理下游的資料重置。
  • Unity 目錄支援:ForEachBatch sink 支援所有 Unity 目錄功能,例如從 Unity 目錄卷或資料表讀取或寫入。
  • 有限的整理:管線無法追蹤從 ForEachBatch 資料接收端寫入的資料,因此無法清理這些資料。 你要負責任何下游的資料管理。
  • 事件日誌條目:管線事件日誌記錄每個 ForEachBatch 匯入器的建立與使用情況。 如果你的 Python 函式無法序列化,你會在事件日誌中看到警告條目和額外的建議。

備註

  • ForEachBatch sink 是為串流查詢設計的,例如 append_flow。 它不適用於僅批次處理的管線或 AutoCDC 語法。
  • 本頁描述的 ForEachBatch 匯入是用於管線。 Apache Spark 結構化串流也支援 foreachBatch。 如需結構化串流 foreachBatch 的相關資訊,請參閱 使用 foreachBatch 將資料寫入任意資料接收端

何時使用 ForEachBatch 水槽

當您的管線需要透過內建匯入格式(如 deltakafka)無法提供的功能時,請使用 ForEachBatch 匯入點。 一般使用案例包括:

  • 合併或上傳到 Delta Lake 資料表:為每個微批次執行自訂合併邏輯(例如處理更新紀錄)。
  • 寫入多個或不支援的端點:將每個批次的輸出寫入多個不支援串流寫入的資料表或外部儲存系統(如某些 JDBC 接收器)。
  • 應用自訂邏輯或轉換:直接在 Python 中操作資料(例如使用專門函式庫或進階轉換)。

關於內建接收器或使用 Python 建立自訂接收器的資訊,請參閱 Lakeflow Spark 宣告管線中的接收器

語法

使用 @dp.foreach_batch_sink() 裝飾來生成 ForEachBatch 資料接收器。 你可以在流程定義中作為target來引用,例如在@dp.append_flow中。

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
參數 Description
name 選擇性。 一個獨特的名稱用來識別管線內的匯。 若未包含,則預設為UDF名稱。
batch_handler 這是每個微批次都會被呼叫的使用者自訂函式(UDF)。
df Spark DataFrame 包含目前微批次的資料。
batch_id 微批次的整數 ID。 Spark 會對每個觸發間隔遞增這個 ID。
A batch_id 代表 0 串流的開始,或是整體刷新作業的起始。 程式碼 foreach_batch_sink 應該能正確處理下游資料來源的完整刷新。 更多資訊請見下一節。

完整重新整理

由於 ForEachBatch 使用串流查詢,管線會追蹤每個資料流的檢查點目錄。 全刷新後:

  • 檢查點目錄會被重置。
  • 你的接收函數(foreach_batch_sink UDF)會看到一個全新的 batch_id 循環,從零開始。
  • 你目標系統中的資料 不會 被管線自動清理(因為管線不知道你的資料寫入位置)。 如果你需要一個全新情況,你必須手動刪除或截斷 ForEachBatch 匯入的外部資料表或位置。

使用 Unity 目錄功能

Spark 結構化串流 foreach_batch_sink 中所有現有的 Unity 目錄功能仍然可用。

這包括寫入受管理的或外部的 Unity Catalog 中的資料表。 你可以像在任何 Apache Spark 結構化串流工作中一樣,將微批次寫入 Unity Catalog 管理或外部資料表。

事件日誌條目

當你建立 ForEachBatch sink 時,會將一個 SinkDefinition 事件與"format": "foreachBatch" 加入管線的事件日誌。

這讓你能追蹤 ForEachBatch sink 的使用情況,並查看有關 sink 的警告。

使用 Databricks Connect

如果你提供的函式 無法序列化 (這是 Databricks Connect 的重要需求),事件日誌中會 WARN 包含一條建議你簡化或重構程式碼的條目,若需要 Databricks Connect 支援。

例如,如果你在 ForEachBatch UDF 中使用 dbutils 來取得參數,你也可以選擇在使用 UDF 之前先取得這些參數。

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

最佳做法

  1. 保持 ForEachBatch 函式簡潔:避免執行緒、大量函式庫依賴或大量記憶體資料操作。 複雜或有狀態邏輯可能導致序列化錯誤或效能瓶頸。
  2. 監控你的檢查點資料夾:對於串流查詢,SDP 是依流程管理檢查點,而不是依匯入端。 如果你的管線中有多個流程,每個流程都有自己的檢查點目錄。
  3. 驗證外部相依性:如果你依賴外部系統或函式庫,請檢查它們是否安裝在所有叢集節點或你的容器中。
  4. 注意 Databricks Connect:如果你的環境未來可能會遷移到 Databricks Connect,請檢查你的程式碼是否可序列化,且在 UDF 中不依賴 dbutilsforeach_batch_sink

局限性

  • ForEachBatch 無法執行清理:因為你的自訂 Python 程式碼可能將資料寫入任何位置,導致管線無法清理或追蹤該資料。 你必須自行管理你寫入目的地的資料管理或保留政策。
  • 微批次中的指標:管線會收集串流指標,但某些情境下使用 ForEachBatch 可能導致指標不完整或異常。 這是因為 ForEachBatch 的底層彈性,使得系統難以追蹤資料流與列數。
  • 支援在無多次讀取的情況下寫入多個目的地:有些客戶可能會使用 ForEachBatch 從來源讀取一次,然後再寫入多個目的地。 為了達成此目標,您必須在 ForEachBatch 函式中包含 df.persistdf.cache。 透過這些選項,Azure Databricks 只會嘗試準備一次資料。 沒有這些選項,你的查詢會導致多次閱讀。 以下程式碼範例中未包含此功能。
  • 搭配 Databricks Connect 使用:如果您的管線在 Databricks Connect 上運行, foreachBatch 使用者定義函數(UDF)必須可序列化,且不能使用 dbutils。 如果管線偵測到無法序列化的 UDF,會發出警告,但不會讓管線失敗。
  • 不可序列化邏輯:引用本地物件、類別或無法使用 pickle 模組序列化的資源的程式碼,可能會在 Databricks Connect 環境中出問題。 使用純 Python 模組,並確認如果 Databricks Connect 是必需品,則不使用參考資料(例如 dbutils)。

範例

基本語法範例

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

使用樣本資料製作簡單管線

這個例子使用了紐約市計程車的樣本。 它假設你的工作區管理員已啟用 Databricks 的 Public Datasets 目錄。 對於水槽,請切換 my_catalog.my_schema 到你能存取的目錄和結構。

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

寫入多個目的地

這個範例會寫入多個目的地。 它展示了如何使用 txnVersiontxnAppId 使對 Delta Lake 資料表的寫入成為冪等。 詳情請參見 冪等性表寫入 foreachBatch

假設我們寫入兩個資料表 table_atable_b,而在同一個批次中,假如寫入 table_a 成功但是寫入 table_b 失敗。 當批次重新執行時,(txnVersiontxnAppId) 對會允許 Delta 忽略重複寫入 table_a,只將批次寫入 table_b

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

使用 spark.sql()

你可以在 ForEachBatch sink 中使用 spark.sql() ,就像以下範例一樣。

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

常見問題 (FAQ)

我可以在我的 ForEachBatch 水槽裡使用 dbutils 嗎?

如果你打算在非 Databricks Connect 環境下運行管線, dbutils 或許可以行。 不過,如果你使用 Databricks Connect, dbutils 則無法在你的 foreachBatch 函式中存取。 管線若偵測到 dbutils 使用情況,可能會發出警告,以協助你避免損壞。

我可以在單一的 ForEachBatch 接收器中使用多個流程嗎?

是的。 你可以定義多個流程(使用 @dp.append_flow),這些流程雖然都針對相同的匯集名稱,但它們各自維持自己的檢查點。

資料處理管道會處理我的目標的資料保留或清理嗎?

否。 由於 ForEachBatch 匯入器可寫入任意位置或系統,管線無法自動管理或刪除該目標中的資料。 你必須將這些操作視為自訂程式碼或外部流程的一部分來處理。

我該如何排除 ForEachBatch 函式中的序列化錯誤或失敗?

查看你的叢集驅動程式日誌或管線事件日誌。 針對 Spark Connect 相關的序列化問題,請檢查你的函式是否僅依賴可序列化的 Python 物件,且不參考被允許的物件(如開啟的檔案 handle 或 dbutils)。