Bagikan melalui


append_flow

Dekorator @dp.append_flow membuat saluran lampiran atau pengisian ulang untuk tabel saluran Anda. Fungsi harus mengembalikan DataFrame streaming Apache Spark. Lihat Memuat dan memproses data secara bertahap dengan Lakeflow Spark Declarative Pipelines flows.

Alur lampiran bisa menargetkan tabel streaming atau sink.

Syntax

from pyspark import pipelines as dp

dp.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dp.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  once = <bool>, # optional, defaults to false
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming-query>) #

Parameter-parameternya

Pengaturan Tipe Description
fungsi function Dibutuhkan. Fungsi yang mengembalikan DataFrame streaming Apache Spark dari kueri yang ditentukan pengguna.
target str Dibutuhkan. Nama tabel atau sink yang merupakan target alur penambahan.
name str Nama alur. Jika tidak disediakan, akan otomatis menggunakan nama fungsi.
once bool Secara opsional, tentukan alur sebagai alur satu kali, seperti isi ulang. Dengan menggunakan once=True, alurnya dapat diubah dengan dua cara:
  • Nilai yang dikembalikan. streaming-query. harus berupa DataFrame batch dalam hal ini, bukan DataFrame streaming.
  • Alur dijalankan satu kali secara default. Jika alur diperbarui dengan pembaruan lengkap, maka ONCE alur akan berjalan kembali untuk membuat ulang data.
comment str Deskripsi untuk alur.
spark_conf dict Daftar konfigurasi Spark untuk eksekusi kueri ini

Examples

from pyspark import pipelines as dp

# Create a sink for an external Delta table
dp.create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

# Add an append flow to an external Delta table
@dp.append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return <streaming-query>

# Add a backfill
@dp.append_flow(name = "backfill", target = "my_sink", once = True)
def backfillFlowFunc():
    return (
      spark.read
      .format("json")
      .load("/path/to/backfill/")
    )

# Create a Kafka sink
dp.create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "topic": "my_topic"
  }
)

# Add an append flow to a Kafka sink
@dp.append_flow(name = "flow", target = "my_kafka_sink")
def myFlow():
  return read_stream("xxx").select(F.to_json(F.struct("*")).alias("value"))