Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Der @dp.append_flow Dekorateur erstellt Anfügeflüsse oder Rückfüllungen für Ihre Pipelinetabellen. Die Funktion muss einen Apache Spark Streaming DataFrame zurückgeben. Siehe Laden und Verarbeiten von Daten inkrementell mit Lakeflow Spark Declarative Pipelines flows.
Anfügeabläufe können auf Streaming-Tabellen oder Senken abzielen.
Syntax
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Die Parameter
| Parameter | Typ | Description |
|---|---|---|
| Funktion | function |
Erforderlich. Eine Funktion, die einen Apache Spark Streaming DataFrame aus einer benutzerdefinierten Abfrage zurückgibt. |
target |
str |
Erforderlich. Der Name der Tabelle oder der Senke, die das Ziel des Anfügevorgangs ist. |
name |
str |
Der Flussname. Wenn nicht angegeben, wird standardmäßig der Funktionsname verwendet. |
once |
bool |
Definieren Sie optional den Fluss als einmaligen Ablauf, z. B. als Rückfüllvorgang. Durch die Verwendung von once=True wird der Fluss auf zwei Arten verändert:
|
comment |
str |
Eine Beschreibung für den Ablauf. |
spark_conf |
dict |
Eine Liste der Spark-Konfigurationen für die Ausführung dieser Abfrage |
Examples
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))