update_flow

Ważna

Interfejs update_flow API jest w publicznej wersji zapoznawczej.

Użyj dekoratora @dp.update_flow , aby utworzyć przepływ aktualizacji. Aktualizuj przepływy zapisu w ujściach przy użyciu trybu aktualizacji danych wyjściowych, emitując tylko wiersze, które uległy zmianie w każdej partii. W przeciwieństwie do przepływów dołączania obsługują agregacje stanowe bez konieczności używania znaku wodnego.

Przepływy aktualizacji mogą kierować tylko ujścia. Tabele różnicowe nie są obsługiwane.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parameter Typ Description
funkcja function Required. Funkcja, która zwraca strumieniowy DataFrame w Apache Spark na podstawie zapytania zdefiniowanego przez użytkownika.
target str Required. Nazwa ujścia, do których jest zapisywany ten przepływ.
name str Nazwa przepływu. Jeśli nie zostanie podana, wartość domyślna to nazwa funkcji.
comment str Opis przepływu.
spark_conf dict Dykt konfiguracji platformy Spark na potrzeby wykonywania tego zapytania. Te konfiguracje zastępują konfiguracje ustawione dla miejsca docelowego, potoku lub klastra.
import_checkpoint str Zewnętrzna ścieżka punktu kontrolnego do zaimportowania przed uruchomieniem przepływu. Zaimportowane tylko raz, gdy katalog punktu kontrolnego przepływu jeszcze nie istnieje.

Examples

Agregacja do ujścia platformy Kafka

Zapisz wyniki agregacji stanowej do ujścia platformy Kafka:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Tryb czasu rzeczywistego

Użyj spark_conf polecenia , aby skonfigurować przepływ aktualizacji dla trybu czasu rzeczywistego:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Ograniczenia

  • Ujścia tabeli delty nie są obsługiwane jako obiekty docelowe dla przepływów aktualizacji.