Megosztás a következőn keresztül:


append_flow

A @dp.append_flow dekorátor hozzáfűző folyamatokat vagy visszatöltéseket hoz létre a folyamattáblákhoz. A függvénynek Apache Spark streaming DataFrame-et kell visszaadnia. Lásd az adatok növekményes betöltését és feldolgozását a Lakeflow Spark Declarative Pipelines flows segítségével.

A hozzáfűzési folyamok megcélozhatnak streamelési táblákat vagy adatfogadókat.

Szemantika

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Paraméterek

Paraméter Típus Description
függvény function Szükséges. Egy függvény, amely egy Apache Spark streamelési DataFrame-et ad vissza egy felhasználó által megadott lekérdezésből.
target str Szükséges. A hozzáfűzési folyamat céltáblájának vagy kimenetének neve.
name str Az adatfolyam neve. Ha nincs megadva, alapértelmezés szerint a függvény neve lesz.
once bool Igény szerint definiálja a folyamatot egyszeri folyamatként, például visszatöltésként. A once=True használata kétféleképpen változtatja meg a folyamatot:
  • A visszatérési érték. streaming-query. Ebben az esetben kötegelt DataFrame-nek kell lennie, nem streamelési DataFrame-nek.
  • A folyamat alapértelmezés szerint egyszer fut. Ha a folyamat teljes frissítéssel frissül, akkor a ONCE folyamat újra fut az adatok újbóli létrehozásához.
comment str A folyamat leírása.
spark_conf dict A lekérdezés végrehajtásához szükséges Spark-konfigurációk listája

Példák

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))