Freigeben über


append_flow

Der @dp.append_flow Dekorateur erstellt Anfügeflüsse oder Rückfüllungen für Ihre Pipelinetabellen. Die Funktion muss einen Apache Spark Streaming DataFrame zurückgeben. Siehe Laden und Verarbeiten von Daten inkrementell mit Lakeflow Spark Declarative Pipelines flows.

Anfügeabläufe können auf Streaming-Tabellen oder Senken abzielen.

Syntax

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Die Parameter

Parameter Typ Description
Funktion function Erforderlich. Eine Funktion, die einen Apache Spark Streaming DataFrame aus einer benutzerdefinierten Abfrage zurückgibt.
target str Erforderlich. Der Name der Tabelle oder der Senke, die das Ziel des Anfügevorgangs ist.
name str Der Flussname. Wenn nicht angegeben, wird standardmäßig der Funktionsname verwendet.
once bool Definieren Sie optional den Fluss als einmaligen Ablauf, z. B. als Rückfüllvorgang. Durch die Verwendung von once=True wird der Fluss auf zwei Arten verändert:
  • Der Rückgabewert. streaming-query. In diesem Fall muss es sich um einen statischen DataFrame handeln, nicht um einen Streaming-DataFrame.
  • Der Fluss wird standardmäßig einmal ausgeführt. Wenn die Pipeline mit einer vollständigen Aktualisierung aktualisiert wird, wird der ONCE Fluss erneut ausgeführt, um die Daten neu zu erstellen.
comment str Eine Beschreibung für den Ablauf.
spark_conf dict Eine Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage

Examples

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))