Sdílet prostřednictvím


append_flow

Dekorátor @dp.append_flow vytvoří přidávací toky nebo backfilly pro tabulky kanálu. Funkce musí vrátit streamingový datový rámec Apache Spark. Viz Přírůstkové načítání a zpracování dat pomocí toků deklarativních kanálů Sparku Lakeflow.

Přidávací proudy můžou cílit na streamované tabulky nebo úložiště.

Syntaxe

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parametry

Parameter Typ Description
funkce function Povinné. Funkce, která vrací datový rámec streamování Apache Sparku z uživatelem definovaného dotazu.
target str Povinné. Název tabulky nebo jímky, která je cílem toku připojení.
name str Název toku. Pokud není zadaný, nastaví se výchozí hodnota názvu funkce.
once bool Volitelně můžete tok definovat jako jednorázový tok, například jako backfill. Použití once=True změní tok dvěma způsoby:
  • Vrácená hodnota. streaming-query. v tomto případě musí být datový rámec dávky, nikoli datový rámec streamování.
  • Proud je ve výchozím nastavení spuštěn jednou. Pokud je potrubí aktualizováno o úplnou obnovu, ONCE tok se spustí znovu, aby znovu vytvořil data.
comment str Popis toku
spark_conf dict Seznam konfigurací Sparku pro spuštění tohoto dotazu

Examples

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))