update_flow

Important

update_flow API はパブリック プレビュー段階です

@dp.update_flowデコレーターを使用して更新フローを作成します。 更新フローは、更新出力モードを使用してシンクへの書き込みを行い、各バッチで変更された行のみを出力します。 追加フローとは異なり、ウォーターマークを必要とせずにステートフル集計をサポートします。

更新フローはシンクのみをターゲットにすることができます。 デルタ テーブルはサポートされていません。

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

パラメーター タイプ Description
関数 function 必須。 ユーザー定義クエリから Apache Spark ストリーミング DataFrame を返す関数。
target str 必須。 このフローが書き込むシンクの名前。
name str フロー名。 指定しない場合は、既定で関数名が使用されます。
comment str フローの説明。
spark_conf dict このクエリを実行するための Spark 構成のディクテーション。 これらの構成は、宛先、パイプライン、またはクラスターに設定された conf をオーバーライドします。
import_checkpoint str フローを開始する前にインポートする外部チェックポイント パス。 フローのチェックポイント ディレクトリがまだ存在しない場合に、1 回だけインポートされます。

例示

Kafka シンクへの集計

ステートフル集計結果を Kafka シンクに書き込みます。

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

リアルタイム モード

spark_confを使用して、リアルタイム モードの更新フローを構成します。

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

制限事項

  • 差分テーブル シンクは、更新フローのターゲットとしてサポートされていません。