Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
A @dp.append_flow dekorátor hozzáfűző folyamatokat vagy visszatöltéseket hoz létre a folyamattáblákhoz. A függvénynek Apache Spark streaming DataFrame-et kell visszaadnia. Lásd az adatok növekményes betöltését és feldolgozását a Lakeflow Spark Declarative Pipelines flows segítségével.
A hozzáfűzési folyamok megcélozhatnak streamelési táblákat vagy adatfogadókat.
Szemantika
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Paraméterek
| Paraméter | Típus | Description |
|---|---|---|
| függvény | function |
Szükséges. Egy függvény, amely egy Apache Spark streamelési DataFrame-et ad vissza egy felhasználó által megadott lekérdezésből. |
target |
str |
Szükséges. A hozzáfűzési folyamat céltáblájának vagy kimenetének neve. |
name |
str |
Az adatfolyam neve. Ha nincs megadva, alapértelmezés szerint a függvény neve lesz. |
once |
bool |
Igény szerint definiálja a folyamatot egyszeri folyamatként, például visszatöltésként. A once=True használata kétféleképpen változtatja meg a folyamatot:
|
comment |
str |
A folyamat leírása. |
spark_conf |
dict |
A lekérdezés végrehajtásához szükséges Spark-konfigurációk listája |
Példák
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))