Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważna
Interfejs update_flow API jest w publicznej wersji zapoznawczej.
Użyj dekoratora @dp.update_flow , aby utworzyć przepływ aktualizacji. Aktualizuj przepływy zapisu w ujściach przy użyciu trybu aktualizacji danych wyjściowych, emitując tylko wiersze, które uległy zmianie w każdej partii. W przeciwieństwie do przepływów dołączania obsługują agregacje stanowe bez konieczności używania znaku wodnego.
Przepływy aktualizacji mogą kierować tylko ujścia. Tabele różnicowe nie są obsługiwane.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parameter | Typ | Description |
|---|---|---|
| funkcja | function |
Required. Funkcja, która zwraca strumieniowy DataFrame w Apache Spark na podstawie zapytania zdefiniowanego przez użytkownika. |
target |
str |
Required. Nazwa ujścia, do których jest zapisywany ten przepływ. |
name |
str |
Nazwa przepływu. Jeśli nie zostanie podana, wartość domyślna to nazwa funkcji. |
comment |
str |
Opis przepływu. |
spark_conf |
dict |
Dykt konfiguracji platformy Spark na potrzeby wykonywania tego zapytania. Te konfiguracje zastępują konfiguracje ustawione dla miejsca docelowego, potoku lub klastra. |
import_checkpoint |
str |
Zewnętrzna ścieżka punktu kontrolnego do zaimportowania przed uruchomieniem przepływu. Zaimportowane tylko raz, gdy katalog punktu kontrolnego przepływu jeszcze nie istnieje. |
Examples
Agregacja do ujścia platformy Kafka
Zapisz wyniki agregacji stanowej do ujścia platformy Kafka:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Tryb czasu rzeczywistego
Użyj spark_conf polecenia , aby skonfigurować przepływ aktualizacji dla trybu czasu rzeczywistego:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Ograniczenia
- Ujścia tabeli delty nie są obsługiwane jako obiekty docelowe dla przepływów aktualizacji.