Udostępnij za pomocą


append_flow

Dekorator @dp.append_flow tworzy dołączanie przepływów lub uzupełniania dla tabel potoków. Funkcja musi zwrócić strumieniową ramkę danych Apache Spark. Zapoznaj się z przepływami deklaratywnymi potoków Spark w Lakeflow, które umożliwiają przyrostowe ładowanie i przetwarzanie danych.

Dołączanie przepływów może dotyczyć tabel przesyłania strumieniowego lub ujścia.

Składnia

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parametry

Parameter Typ Description
funkcja function To jest wymagane. Funkcja, która zwraca strumieniowy DataFrame w Apache Spark na podstawie zapytania zdefiniowanego przez użytkownika.
target str To jest wymagane. Nazwa tabeli lub ujścia będącego elementem docelowym przepływu dołączania.
name str Nazwa przepływu. Jeśli nie zostanie podana, wartość domyślna to nazwa funkcji.
once bool Opcjonalnie zdefiniuj przepływ jako przepływ jednorazowy, taki jak wypełnienie wsteczne. Używanie once=True zmienia przepływ na dwa sposoby:
  • Wartość zwracana. streaming-query. w tym przypadku musi być wsadową ramką danych, a nie strumieniową ramką danych.
  • Domyślnie przepływ jest uruchamiany jeden raz. W przypadku zaktualizowania pipeline'u przez pełne odświeżenie, przepływ ONCE zostanie uruchomiony ponownie w celu odtworzenia danych.
comment str Opis przepływu.
spark_conf dict Lista konfiguracji platformy Spark na potrzeby wykonywania tego zapytania

Przykłady

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))