設定串流查詢的輸出,使用提供的函式進行處理。 僅支援微批次執行模式(即觸發條件非連續時)。 在每個微批次中,所提供的函式會以輸出列作為資料框架(DataFrame)及批次識別碼(batch identifier)被呼叫。 批次 ID 可用於去重並將輸出交易寫入外部系統。
語法
foreachBatch(func)
參數
| 參數 | 類型 | 說明 |
|---|---|---|
func |
可通話 | 一個以 DataFrame 和批次 ID(int)作為輸入的函式。 |
退貨
DataStreamWriter
Notes
在 Spark Connect 模式下,所提供的函式無法存取外部定義的變數。
Examples
import time
df = spark.readStream.format("rate").load()
def func(batch_df, batch_id):
batch_df.collect()
q = df.writeStream.foreachBatch(func).start()
time.sleep(3)
q.stop()