update_flow

Important

The update_flow API is in Public Preview.

Use the @dp.update_flow decorator to create an update flow. Update flows write to sinks using update output mode, emitting only the rows that changed in each batch. Unlike append flows, they support stateful aggregations without requiring a watermark.

Update flows can only target sinks. Delta tables are not supported.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parameter Type Description
function function Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query.
target str Required. The name of the sink this flow writes to.
name str The flow name. If not provided, defaults to the function name.
comment str A description for the flow.
spark_conf dict A dict of Spark configurations for the execution of this query. These configurations override confs set for the destination, pipeline, or cluster.
import_checkpoint str An external checkpoint path to import before starting the flow. Imported only once, when the flow's checkpoint directory does not yet exist.

Examples

Aggregation to a Kafka sink

Write stateful aggregation results to a Kafka sink:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Real-time mode

Use spark_conf to configure an update flow for real-time mode:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • Delta table sinks are not supported as targets for update flows.