Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Important
Az update_flow API nyilvános előzetes verzióban érhető el.
@dp.update_flow A dekorátor használatával hozzon létre egy frissítési folyamatot. Frissítési kimeneti móddal frissíti a folyamatokat a fogadókba, és csak az egyes kötegekben módosított sorokat bocsátja ki. A hozzáfűző folyamatokkal ellentétben az állapotalapú összesítéseket vízjel nélkül támogatják.
A frissítési folyamatok csak fogadókat célozhatnak meg. A deltatáblák nem támogatottak.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Paraméter | Típus | Description |
|---|---|---|
| függvény | function |
Kötelező. Egy függvény, amely egy Apache Spark streamelési DataFrame-et ad vissza egy felhasználó által megadott lekérdezésből. |
target |
str |
Kötelező. Annak a fogadónak a neve, amelybe a folyamat ír. |
name |
str |
A folyam neve. Ha nincs megadva, alapértelmezés szerint a függvény neve lesz. |
comment |
str |
A folyamat leírása. |
spark_conf |
dict |
A lekérdezés végrehajtásához szükséges Spark-konfigurációk diktálása. Ezek a konfigurációk felülbírálják a célhoz, folyamathoz vagy fürthöz beállított konföderációkat. |
import_checkpoint |
str |
A folyamat elindítása előtt importálandó külső ellenőrzőpont-elérési út. Csak egyszer importálva, ha a folyamat ellenőrzőpont-könyvtára még nem létezik. |
Examples
Kafka-fogadó összesítése
Állapotalapú összesítési eredményeket írhat egy Kafka-fogadóba:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Valós idejű mód
Frissítési spark_conf folyamat konfigurálása valós idejű módhoz:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- A deltatábla-fogadók nem támogatottak a frissítési folyamatok céljaiként.