append_flow

Dekoratören @dp.append_flow skapar tilläggsflöden eller påfyllningar för dina pipelinetabeller. Funktionen måste returnera en Apache Spark streaming-Dataram. Se Läsa in och bearbeta data successivt med Lakeflow Spark Declarative Pipelines flows.

Tilläggsflöden kan riktas mot strömmande tabeller eller mottagare.

Syntax

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parameterar

Parameter Typ Description
function function Obligatoriskt. En funktion som returnerar en Apache Spark-strömmande DataFrame från en användardefinierad fråga.
target str Obligatoriskt. Namnet på den tabell eller mottagare som är målet för tilläggsflödet.
name str Flödets namn Om det inte anges används funktionsnamnet som standard.
once bool Du kan också definiera flödet som ett engångsflöde, till exempel en återfyllnad. Om du använder once=True ändras flödet på två sätt:
  • Returvärdet. streaming-query. måste vara en batchdataram i det här fallet, inte en strömmande dataram.
  • Flödet körs en gång som standard. Om pipelinen uppdateras med en komplett uppdatering, så körs flödet igen för att återskapa data.
comment str En beskrivning av flödet.
spark_conf dict En lista över Spark-konfigurationer för körning av den här frågan

Examples

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))