Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Il @dp.append_flow Decorator crea flussi di aggiunta o backfill per le tabelle della pipeline. La funzione deve restituire un dataframe di streaming Apache Spark. Vedere Caricare ed elaborare i dati in modo incrementale con i flussi dichiarativi di Lakeflow Spark.
I flussi di accodamento possono essere destinati a tabelle o sink di streaming.
Sintassi
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Parametri
| Parametro | TIPO | Description |
|---|---|---|
| funzione | function |
Obbligatorio. Funzione che restituisce un dataframe di streaming Apache Spark da una query definita dall'utente. |
target |
str |
Obbligatorio. Nome della tabella o del sink che rappresenta la destinazione del flusso di accodamento. |
name |
str |
Nome del flusso. Se non specificato, per impostazione predefinita viene impostato il nome della funzione. |
once |
bool |
Si può facoltativamente definire il flusso come un flusso monouso, ad esempio un riempimento retroattivo. L'uso di once=True modifica il flusso in due modi:
|
comment |
str |
Descrizione del flusso. |
spark_conf |
dict |
Elenco delle configurazioni di Spark per l'esecuzione di questa query |
Esempi
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))