update_flow

Important

update_flow API Genel Kullanıma Açık Önizleme aşamasındadır.

Bir güncelleştirme akışı oluşturmak için dekoratör kullanın @dp.update_flow . Güncelleştirme akışları, yalnızca her toplu işlemde değiştirilen satırları yayarak güncelleştirme çıkış modunu kullanarak havuzlara yazar. Ekleme akışlarından farklı olarak, filigran gerektirmeden durum bilgisi olan toplamaları destekler.

Güncelleştirme akışları yalnızca havuzları hedefleyebilir. Delta tabloları desteklenmez.

Syntax

from pyspark import pipelines as dp

dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})

@dp.update_flow(
  target = "<sink-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
  comment = "<comment>", # optional
  import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
  return (<streaming-query>)

Parameters

Parametre Türü Description
function function Gerekli. Kullanıcı tanımlı bir sorgudan Apache Spark akış DataFrame'i döndüren işlev.
target str Gerekli. Bu akışın yazdığı havuz adı.
name str Akış adı. Sağlanmadıysa, varsayılan olarak işlev adını kullanır.
comment str Akış açıklaması.
spark_conf dict Bu sorgunun yürütülmesi için Spark yapılandırmalarının bir diktesi. Bu yapılandırmalar hedef, işlem hattı veya küme için ayarlanan konfederasyonları geçersiz kılar.
import_checkpoint str Akışı başlatmadan önce içeri aktarılacağınız dış denetim noktası yolu. Akışın denetim noktası dizini henüz mevcut olmadığında yalnızca bir kez içeri aktarılır.

Examples

Kafka havuzuna toplama

Durum bilgisi olan toplama sonuçlarını kafka havuzuna yazın:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type")
            .groupBy(col("event_type"))
            .count()
    )

Gerçek zamanlı mod

Güncelleştirme akışını gerçek zamanlı mod için yapılandırmak için kullanınspark_conf:

from pyspark import pipelines as dp

dp.create_sink("my_kafka_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
    )

Sınırlamalar

  • Delta tablo havuzları, güncelleştirme akışları için hedef olarak desteklenmez.