Important
API update_flow 處於 公開預覽狀態。
使用 @dp.update_flow 裝飾工具建立更新流程。 更新流程會以更新輸出模式寫入匯入,只輸出每批次中變動的列。 與 附加流不同,它們支援狀態彙整且不需水印。
更新流程只能針對匯。 不支援 Delta 表格。
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| 參數 | 類型 | Description |
|---|---|---|
| 函式 | function |
Required. 用來從使用者定義的查詢中返回 Apache Spark 串流 DataFrame 的函數。 |
target |
str |
Required. 這股流寫入的匯的名稱。 |
name |
str |
流程名稱。 如果未提供,則預設為函式名稱。 |
comment |
str |
對流程的描述。 |
spark_conf |
dict |
執行此查詢的 Spark 配置指令。 這些配置會覆蓋目的地、管線或叢集所設定的 conf。 |
import_checkpoint |
str |
一個外部檢查點路徑,讓它在開始流程前匯入。 僅匯入一次,當流程的檢查點目錄尚未存在時。 |
Examples
聚合到 Kafka 匯
將有狀態的聚合結果寫入 Kafka 匯:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
即時模式
用於 spark_conf 設定 即時模式的更新流程:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- Delta 表格匯入不支援作為更新流程的目標。