Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Important
update_flow API Genel Kullanıma Açık Önizleme aşamasındadır.
Bir güncelleştirme akışı oluşturmak için dekoratör kullanın @dp.update_flow . Güncelleştirme akışları, yalnızca her toplu işlemde değiştirilen satırları yayarak güncelleştirme çıkış modunu kullanarak havuzlara yazar.
Ekleme akışlarından farklı olarak, filigran gerektirmeden durum bilgisi olan toplamaları destekler.
Güncelleştirme akışları yalnızca havuzları hedefleyebilir. Delta tabloları desteklenmez.
Syntax
from pyspark import pipelines as dp
dp.create_sink("<sink-name>", "<format>", {"<key>": "<value>"})
@dp.update_flow(
target = "<sink-name>",
name = "<flow-name>", # optional, defaults to function name
spark_conf = {"<key>" : "<value>", "<key>" : "<value>"}, # optional
comment = "<comment>", # optional
import_checkpoint = "<checkpoint-path>") # optional
def <function-name>():
return (<streaming-query>)
Parameters
| Parametre | Türü | Description |
|---|---|---|
| function | function |
Gerekli. Kullanıcı tanımlı bir sorgudan Apache Spark akış DataFrame'i döndüren işlev. |
target |
str |
Gerekli. Bu akışın yazdığı havuz adı. |
name |
str |
Akış adı. Sağlanmadıysa, varsayılan olarak işlev adını kullanır. |
comment |
str |
Akış açıklaması. |
spark_conf |
dict |
Bu sorgunun yürütülmesi için Spark yapılandırmalarının bir diktesi. Bu yapılandırmalar hedef, işlem hattı veya küme için ayarlanan konfederasyonları geçersiz kılar. |
import_checkpoint |
str |
Akışı başlatmadan önce içeri aktarılacağınız dış denetim noktası yolu. Akışın denetim noktası dizini henüz mevcut olmadığında yalnızca bir kez içeri aktarılır. |
Examples
Kafka havuzuna toplama
Durum bilgisi olan toplama sonuçlarını kafka havuzuna yazın:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type")
.groupBy(col("event_type"))
.count()
)
Gerçek zamanlı mod
Güncelleştirme akışını gerçek zamanlı mod için yapılandırmak için kullanınspark_conf:
from pyspark import pipelines as dp
dp.create_sink("my_kafka_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
)
Sınırlamalar
- Delta tablo havuzları, güncelleştirme akışları için hedef olarak desteklenmez.