Important
update_flow API はパブリック プレビュー段階です。
@dp.update_flowデコレーターを使用して更新フローを作成します。 更新フローは、更新出力モードを使用してシンクへの書き込みを行い、各バッチで変更された行のみを出力します。
追加フローとは異なり、ウォーターマークを必要とせずにステートフル集計をサポートします。
更新フローはシンクのみをターゲットにすることができます。 デルタ テーブルはサポートされていません。
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| パラメーター | タイプ | Description |
|---|---|---|
| 関数 | function |
必須。 ユーザー定義クエリから Apache Spark ストリーミング DataFrame を返す関数。 |
target |
str |
必須。 このフローが書き込むシンクの名前。 |
name |
str |
フロー名。 指定しない場合は、既定で関数名が使用されます。 |
comment |
str |
フローの説明。 |
spark_conf |
dict |
このクエリを実行するための Spark 構成のディクテーション。 これらの構成は、宛先、パイプライン、またはクラスターに設定された conf をオーバーライドします。 |
import_checkpoint |
str |
フローを開始する前にインポートする外部チェックポイント パス。 フローのチェックポイント ディレクトリがまだ存在しない場合に、1 回だけインポートされます。 |
例示
Kafka シンクへの集計
ステートフル集計結果を Kafka シンクに書き込みます。
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
リアルタイム モード
spark_confを使用して、リアルタイム モードの更新フローを構成します。
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
制限事項
- 差分テーブル シンクは、更新フローのターゲットとしてサポートされていません。