update_flow

Important

Rozhraní update_flow API je ve verzi Public Preview.

Pomocí dekorátoru @dp.update_flow vytvořte tok aktualizace. Aktualizujte toky zápisu do jímek pomocí režimu výstupu aktualizace a generují pouze řádky, které se v každé dávce změnily. Na rozdíl od přidávacích toků podporují stavové agregace bez nutnosti vodoznaku.

Toky aktualizací můžou cílit pouze na jímky. Tabulky Delta nejsou podporovány.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parameter Typ Description
funkce function Required. Funkce, která vrací datový rámec streamování Apache Sparku z uživatelem definovaného dotazu.
target str Required. Název jímky, do které tento tok zapíše.
name str Název toku. Pokud není zadaný, nastaví se výchozí hodnota názvu funkce.
comment str Popis toku
spark_conf dict Diktování konfigurací Sparku pro spuštění tohoto dotazu. Tyto konfigurace přepisují konfigurace nastavené pro cíl, kanál nebo cluster.
import_checkpoint str Cesta k externímu kontrolnímu bodu, která se má importovat před zahájením toku. Importován pouze jednou, pokud adresář kontrolního bodu toku ještě neexistuje.

Příklady

Agregace do jímky Kafka

Zápis stavových výsledků agregace do jímky Kafka:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Režim v reálném čase

Slouží spark_conf ke konfiguraci toku aktualizace pro režim v reálném čase:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Omezení

  • Rozdílové jímky tabulek nejsou podporovány jako cíle pro toky aktualizací.