update_flow

Important

Az update_flow API nyilvános előzetes verzióban érhető el.

@dp.update_flow A dekorátor használatával hozzon létre egy frissítési folyamatot. Frissítési kimeneti móddal frissíti a folyamatokat a fogadókba, és csak az egyes kötegekben módosított sorokat bocsátja ki. A hozzáfűző folyamatokkal ellentétben az állapotalapú összesítéseket vízjel nélkül támogatják.

A frissítési folyamatok csak fogadókat célozhatnak meg. A deltatáblák nem támogatottak.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Paraméter Típus Description
függvény function Kötelező. Egy függvény, amely egy Apache Spark streamelési DataFrame-et ad vissza egy felhasználó által megadott lekérdezésből.
target str Kötelező. Annak a fogadónak a neve, amelybe a folyamat ír.
name str A folyam neve. Ha nincs megadva, alapértelmezés szerint a függvény neve lesz.
comment str A folyamat leírása.
spark_conf dict A lekérdezés végrehajtásához szükséges Spark-konfigurációk diktálása. Ezek a konfigurációk felülbírálják a célhoz, folyamathoz vagy fürthöz beállított konföderációkat.
import_checkpoint str A folyamat elindítása előtt importálandó külső ellenőrzőpont-elérési út. Csak egyszer importálva, ha a folyamat ellenőrzőpont-könyvtára még nem létezik.

Examples

Kafka-fogadó összesítése

Állapotalapú összesítési eredményeket írhat egy Kafka-fogadóba:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Valós idejű mód

Frissítési spark_conf folyamat konfigurálása valós idejű módhoz:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Limitations

  • A deltatábla-fogadók nem támogatottak a frissítési folyamatok céljaiként.