Gunakan ForEachBatch untuk menulis ke tujuan data sembarang dalam pipeline

Penting

foreach_batch_sink API berada di Pratinjau Umum.

Sink ForEachBatch memungkinkan Anda memproses aliran sebagai serangkaian batch mikro. Setiap batch dapat diproses dalam Python dengan logika kustom yang mirip dengan foreachBatch Apache Spark Structured Streaming. Dengan sink ForEachBatch Lakeflow Spark Declarative Pipelines (SDP), Anda dapat mengubah, menggabungkan, atau menulis data streaming ke satu atau beberapa target yang tidak secara bawaan mendukung penulisan streaming. Halaman ini membimbing Anda dalam menyiapkan sink ForEachBatch, memberikan contoh-contoh, dan membahas pertimbangan utama.

Sink ForEachBatch menyediakan fungsionalitas berikut:

  • Logika kustom untuk setiap mikro-batch: ForEachBatch adalah sink streaming yang fleksibel. Anda dapat menerapkan tindakan sembarang (seperti menggabungkan ke tabel eksternal, menulis ke beberapa tujuan, atau melakukan upsert) dengan kode Python.
  • Dukungan refresh penuh: Pipeline mengelola titik pemeriksaan berdasarkan masing-masing alur, sehingga titik pemeriksaan direset secara otomatis ketika Anda melakukan refresh penuh pada alur Anda. Dengan ForEachBatch sink, Anda bertanggung jawab untuk mengelola penyetelan ulang data yang mengalir ke bawah ketika hal ini terjadi.
  • Dukungan Katalog Unity: Sink ForEachBatch mendukung semua fitur Katalog Unity, seperti membaca dari atau menulis ke volume atau tabel Katalog Unity.
  • Housekeeping terbatas: Alur tidak melacak data apa yang ditulis dari sink ForEachBatch, jadi tidak dapat membersihkan data tersebut. Anda bertanggung jawab atas manajemen data hilir apa pun.
  • Entri log peristiwa: Log kejadian alur kerja mencatat pembuatan dan penggunaan setiap penampungan ForEachBatch. Jika fungsi Python Anda tidak dapat diserialisasikan, Anda akan melihat entri peringatan di log peristiwa dengan saran tambahan.

Nota

  • Sink ForEachBatch dirancang untuk kueri streaming, contohnya append_flow. Ini tidak ditujukan untuk alur khusus batch atau untuk AutoCDC semantik.
  • Sink ForEachBatch yang dijelaskan di halaman ini adalah untuk pipeline. Apache Spark Structured Streaming juga mendukung foreachBatch. Untuk informasi tentang Streaming TerstrukturforeachBatch, lihat Gunakan foreachBatch untuk menulis ke sink data arbitrer.

Kapan menggunakan penampung ForEachBatch

Gunakan sink ForEachBatch setiap kali alur Anda memerlukan fungsionalitas yang tidak tersedia melalui format sink bawaan seperti delta, atau kafka. Kasus penggunaan umum meliputi:

  • Menggabungkan atau melakukan upserting ke dalam tabel Delta Lake: Menjalankan logika penggabungan kustom untuk setiap mikro-batch (misalnya, menangani rekaman yang diperbarui).
  • Menulis ke beberapa tujuan atau tujuan yang tidak didukung: Tulis output setiap batch ke beberapa tabel atau sistem penyimpanan luar yang tidak mendukung penulisan streaming (seperti sink JDBC tertentu).
  • Aplikasikan logika atau transformasi kustom: Memanipulasi data dalam Python secara langsung (misalnya, menggunakan pustaka khusus atau transformasi tingkat lanjut).

Untuk informasi tentang sink bawaan, atau membuat sink kustom dengan Python, lihat Sinks di Lakeflow Spark Declarative Pipelines.

Syntax

Gunakan dekorasi @dp.foreach_batch_sink() untuk menghasilkan sink ForEachBatch. Anda kemudian dapat mereferensikan ini sebagai target dalam definisi alur Anda, misalnya di @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Pengaturan Description
Nama Optional. Nama unik untuk mengidentifikasi sink dalam alur. Diatur ke nama UDF secara default, ketika tidak disertakan.
batch_handler Ini adalah fungsi yang ditentukan pengguna (UDF) yang akan dipanggil untuk setiap batch mikro.
Df Spark DataFrame yang berisi data untuk mikro-batch saat ini.
batch_id ID bilangan bulat dari mikro-batch. Spark menaikkan ID ini untuk setiap interval pemicu.
Dari batch_id0 mewakili awal aliran, atau awal refresh penuh. Kode foreach_batch_sink harus menangani pembaruan penuh untuk sumber data hilir dengan tepat. Lihat bagian berikutnya untuk informasi selengkapnya.

Pembaruan penuh

Karena ForEachBatch menggunakan kueri streaming, pipeline melacak direktori titik pemeriksaan untuk setiap proses aliran. Pada refresh penuh:

  • Direktori titik pemeriksaan diatur ulang.
  • Fungsi sink Anda (foreach_batch_sink UDF) melihat siklus yang benar-benar baru batch_id mulai dari 0.
  • Data dalam sistem target Anda tidak dihapus secara otomatis oleh pipeline (karena pipeline tidak mengetahui lokasi penulisan data Anda). Jika Anda memerlukan skenario clean-slate, Anda harus menghilangkan atau memotong tabel atau lokasi eksternal yang diisi sink ForEachBatch Anda secara manual.

Menggunakan fitur Unity Catalog

Semua kemampuan Katalog Unity yang ada di Spark Structured Streaming foreach_batch_sink tetap tersedia.

Ini termasuk menulis ke tabel Unity Catalog terkelola atau eksternal. Anda dapat menulis batch mikro ke dalam tabel yang dikelola atau eksternal dari Unity Catalog persis seperti yang Anda lakukan dalam setiap pekerjaan Streaming Terstruktur Apache Spark.

Catatan log peristiwa

Saat Anda membuat sink ForEachBatch, sebuah peristiwa SinkDefinition dengan "format": "foreachBatch" ditambahkan ke log peristiwa alur.

Ini memungkinkan Anda melacak penggunaan sink ForEachBatch, dan melihat peringatan tentang sink Anda.

Menggunakan dengan Databricks Connect

Jika fungsi yang Anda berikan tidak dapat diserialisasikan (persyaratan penting untuk Databricks Connect), log peristiwa menyertakan WARN entri yang merekomendasikan agar Anda menyederhanakan atau merefaktor kode Anda jika dukungan Databricks Connect diperlukan.

Misalnya, jika Anda menggunakan dbutils untuk mendapatkan parameter dalam UDF ForEachBatch, Anda bisa mendapatkan argumen sebelum menggunakannya di UDF:

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

Praktik terbaik

  1. Jaga agar fungsi ForEachBatch Anda tetap ringkas: Hindari pemrosesan paralel, ketergantungan pustaka yang berat, atau manipulasi data dalam jumlah besar di memori. Logika kompleks atau stateful dapat menyebabkan kesalahan serialisasi atau kendala performa.
  2. Pantau folder checkpoint Anda: Untuk kueri streaming, SDP mengelola checkpoint berdasarkan alur, bukan berdasarkan sink. Jika Anda memiliki beberapa alur di alur Anda, setiap alur memiliki direktori titik pemeriksaannya sendiri.
  3. Validasi dependensi eksternal: Jika Anda mengandalkan sistem atau pustaka eksternal, periksa apakah dependensi tersebut diinstal pada semua node kluster atau di kontainer Anda.
  4. Perhatikan Databricks Connect: Jika lingkungan Anda mungkin pindah ke Databricks Connect di masa mendatang, pastikan bahwa kode Anda dapat diserialisasikan dan tidak bergantung pada dbutils di dalam UDF foreach_batch_sink.

Keterbatasan

  • Tidak ada housekeeping untuk ForEachBatch: Karena kode Python kustom Anda dapat menulis data di mana saja, alur tidak dapat membersihkan atau melacak data tersebut. Anda harus menangani manajemen data atau kebijakan penyimpanan Anda sendiri untuk tujuan yang Anda tulis.
  • Metrik dalam mikro-batch: Alur mengumpulkan metrik streaming, tetapi beberapa skenario dapat menyebabkan metrik yang tidak lengkap atau tidak biasa saat menggunakan ForEachBatch. Hal ini disebabkan oleh fleksibilitas yang terdapat dalam ForEachBatch yang membuat pelacakan aliran data dan baris menjadi sulit bagi sistem.
  • Mendukung penulisan ke beberapa tujuan tanpa perlu membaca berulang kali: Beberapa pelanggan dapat menggunakan ForEachBatch untuk membaca sekali dari sumber lalu menulis ke beberapa tujuan. Untuk mencapai hal ini, Anda harus menyertakan df.persist atau df.cache di dalam fungsi ForEachBatch Anda. Dengan menggunakan opsi ini, Azure Databricks akan mencoba menyiapkan data hanya satu kali. Tanpa opsi ini, kueri Anda akan menghasilkan beberapa bacaan. Ini tidak termasuk dalam contoh kode berikut.
  • Menggunakan dengan Databricks Connect: Jika alur Anda berjalan di Databricks Connect, foreachBatch fungsi yang ditentukan pengguna (UDF) harus dapat diserialisasikan dan tidak dapat menggunakan dbutils. Alur memunculkan peringatan jika mendeteksi UDF yang tidak dapat diserialisasikan, tetapi tidak gagal dalam alur.
  • Logika yang tidak dapat diserialisasi: Kode yang mereferensikan objek lokal, kelas, atau sumber daya yang tidak dapat dipickle dapat pecah dalam konteks Databricks Connect. Gunakan modul Python murni dan konfirmasikan bahwa referensi (misalnya, dbutils) tidak digunakan jika Databricks Connect adalah persyaratan.

Examples

Contoh sintaks dasar

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Menggunakan data sampel untuk alur sederhana

Contoh ini menggunakan sampel Taksi NYC. Ini mengasumsikan bahwa admin ruang kerja Anda telah mengaktifkan katalog Himpunan Data Publik Databricks. Untuk sink, ubah my_catalog.my_schema ke katalog dan skema yang dapat Anda akses.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Penulisan ke berbagai tujuan

Contoh ini menulis ke beberapa tujuan. Ini menunjukkan penggunaan txnVersion dan txnAppId untuk membuat penulisan ke tabel Delta Lake idempoten. Untuk detailnya, lihat Menggunakan foreachBatch untuk penulisan tabel idempogen.

Misalkan kita menulis pada dua tabel, table_a dan table_b, dan misalkan dalam batch, penulisan ke table_a berhasil sementara penulisan ke table_b gagal. Ketika batch dijalankan kembali, pasangan (txnVersion, txnAppId) akan memungkinkan Delta untuk mengabaikan penulisan duplikat ke table_a, dan hanya menulis batch ke table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Menggunakan spark.sql()

Anda dapat menggunakan spark.sql() di sink ForEachBatch Anda, seperti dalam contoh berikut.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Tanya jawab umum (FAQ)

Dapatkah saya menggunakan dbutils di sink ForEachBatch saya?

Jika Anda berencana untuk menjalankan alur Anda di lingkungan non-Databricks Connect, dbutils mungkin berfungsi. Namun, jika Anda menggunakan Databricks Connect, dbutils tidak dapat diakses dalam fungsi Anda foreachBatch . Alur kerja dapat memunculkan peringatan jika mendeteksi penggunaan dbutils untuk membantu Anda menghindari gangguan.

Dapatkah saya menggunakan beberapa alur dengan satu sink ForEachBatch?

Ya. Anda dapat menentukan beberapa alur (dengan @dp.append_flow) yang semuanya menargetkan nama sink yang sama, tetapi masing-masing mempertahankan titik pemeriksaan mereka sendiri.

Apakah jalur pemrosesan menangani retensi atau pembersihan data untuk sasaran saya?

Tidak. Karena sink ForEachBatch dapat menulis ke lokasi atau sistem arbitrer apa pun, alur tidak dapat secara otomatis mengelola atau menghapus data dalam target tersebut. Anda harus menangani operasi ini sebagai bagian dari kode kustom atau proses eksternal Anda.

Bagaimana cara memecahkan masalah kesalahan atau kegagalan serialisasi dalam fungsi ForEachBatch saya?

Lihat log kluster driver atau log kejadian pipeline Anda. Untuk masalah serialisasi terkait Spark Connect, periksa apakah fungsi Anda hanya bergantung pada objek Python yang dapat diserialisasikan dan tidak mereferensikan objek yang tidak diizinkan (seperti handel file terbuka atau dbutils).