Delen via


append_flow

De @dp.append_flow decorator maakt toevoegstromen of backfills voor uw pijplijntabellen. De functie moet een Apache Spark-streaming dataframe retourneren. Zie Incrementeel gegevens laden en verwerken met Lakeflow Spark-declaratieve pijplijnenstromen.

Toevoegstromen kunnen gericht zijn op streamingtabellen of sinks.

Syntaxis

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parameterwaarden

Kenmerk Typologie Description
function function Verplicht. Een functie die een Streaming DataFrame van Apache Spark retourneert vanuit een door de gebruiker gedefinieerde query.
target str Verplicht. De naam van de tabel of sink die het doel is van de toevoegstroom.
name str De naam van de stroom. Als deze niet is opgegeven, wordt standaard de functienaam gebruikt.
once bool U kunt de stroom desgewenst definiëren als een eenmalige stroom, zoals een backfill. Door once=True te gebruiken verandert de stroom op twee manieren:
  • De retourwaarde. streaming-query. moet in dit geval een Batch DataFrame zijn, niet een streaming DataFrame.
  • De stroom wordt standaard één keer uitgevoerd. Als de pijplijn wordt bijgewerkt met een volledige vernieuwing, wordt de ONCE stroom opnieuw uitgevoerd om de gegevens opnieuw te maken.
comment str Een beschrijving voor het proces.
spark_conf dict Een lijst met Spark-configuraties voor de uitvoering van deze query

Voorbeelden

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))