update_flow

Important

API update_flow 處於 公開預覽狀態

使用 @dp.update_flow 裝飾工具建立更新流程。 更新流程會以更新輸出模式寫入匯入,只輸出每批次中變動的列。 與 附加流不同,它們支援狀態彙整且不需水印。

更新流程只能針對匯。 不支援 Delta 表格。

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

參數 類型 Description
函式 function Required. 用來從使用者定義的查詢中返回 Apache Spark 串流 DataFrame 的函數。
target str Required. 這股流寫入的匯的名稱。
name str 流程名稱。 如果未提供,則預設為函式名稱。
comment str 對流程的描述。
spark_conf dict 執行此查詢的 Spark 配置指令。 這些配置會覆蓋目的地、管線或叢集所設定的 conf。
import_checkpoint str 一個外部檢查點路徑,讓它在開始流程前匯入。 僅匯入一次,當流程的檢查點目錄尚未存在時。

Examples

聚合到 Kafka 匯

將有狀態的聚合結果寫入 Kafka 匯:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

即時模式

用於 spark_conf 設定 即時模式的更新流程:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • Delta 表格匯入不支援作為更新流程的目標。