Condividi tramite


append_flow

Il @dp.append_flow Decorator crea flussi di aggiunta o backfill per le tabelle della pipeline. La funzione deve restituire un dataframe di streaming Apache Spark. Vedere Caricare ed elaborare i dati in modo incrementale con i flussi dichiarativi di Lakeflow Spark.

I flussi di accodamento possono essere destinati a tabelle o sink di streaming.

Sintassi

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parametri

Parametro TIPO Description
funzione function Obbligatorio. Funzione che restituisce un dataframe di streaming Apache Spark da una query definita dall'utente.
target str Obbligatorio. Nome della tabella o del sink che rappresenta la destinazione del flusso di accodamento.
name str Nome del flusso. Se non specificato, per impostazione predefinita viene impostato il nome della funzione.
once bool Si può facoltativamente definire il flusso come un flusso monouso, ad esempio un riempimento retroattivo. L'uso di once=True modifica il flusso in due modi:
  • Il valore di ritorno. streaming-query. deve essere un dataframe batch in questo caso, non un dataframe di streaming.
  • Il flusso viene eseguito una sola volta per impostazione predefinita. Se la pipeline viene aggiornata con un aggiornamento completo, il ONCE flusso viene eseguito di nuovo per ricreare i dati.
comment str Descrizione del flusso.
spark_conf dict Elenco delle configurazioni di Spark per l'esecuzione di questa query

Esempi

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))