Not
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Dekoratören @dp.append_flow skapar tilläggsflöden eller påfyllningar för dina pipelinetabeller. Funktionen måste returnera en Apache Spark streaming-Dataram. Se Läsa in och bearbeta data successivt med Lakeflow Spark Declarative Pipelines flows.
Tilläggsflöden kan riktas mot strömmande tabeller eller mottagare.
Syntax
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Parameterar
| Parameter | Typ | Description |
|---|---|---|
| function | function |
Obligatoriskt. En funktion som returnerar en Apache Spark-strömmande DataFrame från en användardefinierad fråga. |
target |
str |
Obligatoriskt. Namnet på den tabell eller mottagare som är målet för tilläggsflödet. |
name |
str |
Flödets namn Om det inte anges används funktionsnamnet som standard. |
once |
bool |
Du kan också definiera flödet som ett engångsflöde, till exempel en återfyllnad. Om du använder once=True ändras flödet på två sätt:
|
comment |
str |
En beskrivning av flödet. |
spark_conf |
dict |
En lista över Spark-konfigurationer för körning av den här frågan |
Examples
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))