Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Important
Rozhraní update_flow API je ve verzi Public Preview.
Pomocí dekorátoru @dp.update_flow vytvořte tok aktualizace. Aktualizujte toky zápisu do jímek pomocí režimu výstupu aktualizace a generují pouze řádky, které se v každé dávce změnily. Na rozdíl od přidávacích toků podporují stavové agregace bez nutnosti vodoznaku.
Toky aktualizací můžou cílit pouze na jímky. Tabulky Delta nejsou podporovány.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parameter | Typ | Description |
|---|---|---|
| funkce | function |
Required. Funkce, která vrací datový rámec streamování Apache Sparku z uživatelem definovaného dotazu. |
target |
str |
Required. Název jímky, do které tento tok zapíše. |
name |
str |
Název toku. Pokud není zadaný, nastaví se výchozí hodnota názvu funkce. |
comment |
str |
Popis toku |
spark_conf |
dict |
Diktování konfigurací Sparku pro spuštění tohoto dotazu. Tyto konfigurace přepisují konfigurace nastavené pro cíl, kanál nebo cluster. |
import_checkpoint |
str |
Cesta k externímu kontrolnímu bodu, která se má importovat před zahájením toku. Importován pouze jednou, pokud adresář kontrolního bodu toku ještě neexistuje. |
Příklady
Agregace do jímky Kafka
Zápis stavových výsledků agregace do jímky Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Režim v reálném čase
Slouží spark_conf ke konfiguraci toku aktualizace pro režim v reálném čase:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Omezení
- Rozdílové jímky tabulek nejsou podporovány jako cíle pro toky aktualizací.