foreachBatch(DataStreamWriter)

設定串流查詢的輸出,使用提供的函式進行處理。 僅支援微批次執行模式(即觸發條件非連續時)。 在每個微批次中,所提供的函式會以輸出列作為資料框架(DataFrame)及批次識別碼(batch identifier)被呼叫。 批次 ID 可用於去重並將輸出交易寫入外部系統。

語法

foreachBatch(func)

參數

參數 類型 說明
func 可通話 一個以 DataFrame 和批次 ID(int)作為輸入的函式。

退貨

DataStreamWriter

Notes

在 Spark Connect 模式下,所提供的函式無法存取外部定義的變數。

Examples

import time
df = spark.readStream.format("rate").load()

def func(batch_df, batch_id):
    batch_df.collect()

q = df.writeStream.foreachBatch(func).start()
time.sleep(3)
q.stop()