Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Viktigt!
API:et update_flow finns i offentlig förhandsversion.
Använd dekoratören @dp.update_flow för att skapa ett uppdateringsflöde. Uppdatera flöden skrivs till mottagare med uppdateringsutdataläget, vilket endast genererar de rader som har ändrats i varje batch. Till skillnad från tilläggsflöden stöder de tillståndskänsliga aggregeringar utan att kräva en vattenstämpel.
Uppdateringsflöden kan bara rikta in sig på mottagare. Deltatabeller stöds inte.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parameter | Type | Description |
|---|---|---|
| function | function |
Required. En funktion som returnerar en Apache Spark-strömmande DataFrame från en användardefinierad fråga. |
target |
str |
Required. Namnet på mottagaren som det här flödet skriver till. |
name |
str |
Flödets namn Om det inte anges används funktionsnamnet som standard. |
comment |
str |
En beskrivning av flödet. |
spark_conf |
dict |
En diktering av Spark-konfigurationer för körningen av den här frågan. Dessa konfigurationer åsidosätter konfigurationer som angetts för mål, pipeline eller kluster. |
import_checkpoint |
str |
En extern kontrollpunktssökväg som ska importeras innan flödet startas. Importeras bara en gång, när flödets kontrollpunktskatalog ännu inte finns. |
Exempel
Sammansättning till en Kafka-mottagare
Skriva tillståndskänsliga aggregeringsresultat till en Kafka-mottagare:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Realtidsläge
Använd spark_conf för att konfigurera ett uppdateringsflöde för realtidsläge:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Limitations
- Deltatabellmottagare stöds inte som mål för uppdateringsflöden.