Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Dekoratör, @dp.append_flow işlem hattı tablolarınız için ekleme akışları veya geri doldurmalar oluşturur. İşlev bir Apache Spark akış veri çerçevesi döndürmelidir. Bkz. Lakeflow Spark Bildirimli İşlem Hatları akışlarıyla verileri artımlı olarak yükleme ve işleme.
Ekleme akışları akış tablolarını veya havuzlarını hedefleyebilir.
Sözdizimi
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Parametreler
| Parametre | Türü | Description |
|---|---|---|
| function | function |
Gerekli. Kullanıcı tanımlı bir sorgudan Apache Spark akış DataFrame'i döndüren işlev. |
target |
str |
Gerekli. Ekleme akışının hedefi olan tablo veya havuzun adı. |
name |
str |
Akış adı. Sağlanmadıysa, varsayılan olarak işlev adını kullanır. |
once |
bool |
İsteğe bağlı olarak, akışı yedek doldurma gibi tek seferlik bir akış olarak tanımlayın. Kullanımı once=True , akışı iki şekilde değiştirir:
|
comment |
str |
Akış açıklaması. |
spark_conf |
dict |
Bu sorgunun yürütülmesi için Spark yapılandırmalarının listesi |
Örnekler
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))