Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Dekorator @dp.append_flow membuat saluran lampiran atau pengisian ulang untuk tabel saluran Anda. Fungsi harus mengembalikan DataFrame streaming Apache Spark. Lihat Memuat dan memproses data secara bertahap dengan Lakeflow Spark Declarative Pipelines flows.
Alur lampiran bisa menargetkan tabel streaming atau sink.
Syntax
from pyspark import pipelines as dp
dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.
@dp.append_flow(
target = "<target-table-name>",
name = "<flow-name>", # optional, defaults to function name
once = <bool>, # optional, defaults to false
spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
comment = "<comment>") # optional
def <function-name>():
return (<streaming-query>) #
Parameter-parameternya
| Pengaturan | Tipe | Description |
|---|---|---|
| fungsi | function |
Dibutuhkan. Fungsi yang mengembalikan DataFrame streaming Apache Spark dari kueri yang ditentukan pengguna. |
target |
str |
Dibutuhkan. Nama tabel atau sink yang merupakan target alur penambahan. |
name |
str |
Nama alur. Jika tidak disediakan, akan otomatis menggunakan nama fungsi. |
once |
bool |
Secara opsional, tentukan alur sebagai alur satu kali, seperti isi ulang. Dengan menggunakan once=True, alurnya dapat diubah dengan dua cara:
|
comment |
str |
Deskripsi untuk alur. |
spark_conf |
dict |
Daftar konfigurasi Spark untuk eksekusi kueri ini |
Examples
from pyspark import pipelines as dp
# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})
# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
return <streaming-query>
# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
return (
spark.read
.format("json")
.load("/path/to/backfill/")
)
# Create a Kafka sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "host:port",
"topic": "my_topic"
}
)
# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))