Share via


append_flow

The @dlt.append_flow decorator creates append flows or backfills for your Lakeflow Declarative Pipelines tables. The function must return an Apache Spark streaming DataFrame. See Load and process data incrementally with Lakeflow Declarative Pipelines flows.

Append flows can target streaming tables or sinks.

Syntax

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <boolean>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parameters

Parameter Type Description
function function Required. A function that returns an Apache Spark streaming DataFrame from a user-defined query.
target str Required. The name of the table or sink that is the target of the append flow.
name str The flow name. If not provided, defaults to the function name.
once bool Optionally, define the flow as a one-time flow, such as a backfill. Using once=True changes the flow in two ways:
  • The return value. streaming-query. must be a batch DataFrame in this case, not a streaming DataFrame.
  • The flow is run one time by default. If the pipeline is updated with a complete refresh, then the ONCE flow runs again to recreate the data.
comment str A description for the flow.
spark_conf dict A list of Spark configurations for the execution of this query

Examples

import dlt

# Create a sink for an external Delta table
dlt.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dlt.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dlt.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dlt.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dlt.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))