Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Halaman ini memperlihatkan cara menggunakan foreachBatch dengan Streaming Terstruktur untuk menulis output kueri streaming ke sumber data yang tidak memiliki sink streaming yang ada.
Pola streamingDF.writeStream.foreachBatch(...) kode memungkinkan Anda menerapkan fungsi batch ke data keluaran dari setiap micro-batch kueri streaming. Fungsi yang digunakan dengan foreachBatch mengambil dua parameter:
- Sebuah DataFrame yang berisi data output dari mikro-batch.
- ID unik dari mikro-batch.
Anda harus menggunakan foreachBatch untuk operasi penggabungan Delta Lake di Streaming Terstruktur. Lihat Upsert dari kueri streaming menggunakan foreachBatch.
Menerapkan operasi DataFrame tambahan
Banyak operasi DataFrame dan Dataset tidak didukung dalam streaming DataFrames karena Spark tidak mendukung pembuatan rencana inkremental dalam kasus tersebut. Dengan menggunakan foreachBatch() Anda dapat menerapkan beberapa operasi ini pada setiap output batch mikro. Misalnya, Anda dapat menggunakan foreachBatch() dan operasi SQL MERGE INTO untuk menulis output agregasi streaming ke dalam tabel Delta dalam mode pembaruan. Lihat detail selengkapnya di MERGE INTO.
Penting
-
foreachBatch()hanya memberikan jaminan menulis setidaknya sekali. Namun, Anda dapat menggunakanbatchIdyang disediakan untuk fungsi sebagai metode untuk menghilangkan duplikasi output dan mendapatkan jaminan tepat sekali. Dalam kedua kasus, Anda harus memiliki alasan sendiri tentang semantik secara menyeluruh. -
foreachBatch()tidak berfungsi dengan mode pemrosesan kontinu karena pada dasarnya bergantung pada eksekusi batch mikro dari kueri streaming. Jika Anda menulis data dalam mode kontinu, gunakanforeach()sebagai gantinya. - Saat menggunakan
foreachBatchdengan operator stateful, penting untuk sepenuhnya mengonsumsi setiap batch sebelum pemrosesan selesai. Lihat Mengonsumsi setiap DataFrame batch sepenuhnya
Menangani DataFrame kosong
foreachBatch() mungkin menerima DataFrame kosong, dan kode Anda harus menangani skenario ini. Jika tidak, kueri Anda mungkin gagal.
Misalnya, ketika Delta Lake adalah sumber streaming, skenario ini mungkin meneruskan DataFrame kosong ke foreachBatch():
-
OPTIMIZEjika tidak ada file untuk diproses: Ketika operasiOPTIMIZEdijalankan pada tabel sumber Delta Lake tetapi tidak ada file untuk diproses, Structured Streaming menulis entri log offset untuk meningkatkan versi tabel. Ini menghasilkan mikro-batch kosong pada sink meskipun tidak ada file yang dibaca. - Pemangkasan file pada tingkat rencana fisik: Jika pendorongan predikat atau pemangkasan file menghilangkan semua rekaman di tingkat rencana fisik, hasilnya adalah commit yang kosong ke dalam sink.
Kode pengguna harus menangani DataFrames kosong untuk memungkinkan operasi yang tepat. Lihat contoh di bawah ini:
Python
def process_batch(output_df, batch_id):
# Process valid DataFrames only
if not output_df.isEmpty():
# business logic
pass
streamingDF.writeStream.foreachBatch(process_batch).start()
Scala
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid DataFrames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Perubahan perilaku untuk foreachBatch di Databricks Runtime 14.0
Dalam Databricks Runtime 14.0 ke atas pada komputasi yang dikonfigurasi dengan mode akses standar, perubahan perilaku berikut berlaku:
-
print()perintah menulis output ke log driver. - Anda tidak dapat mengakses
dbutils.widgetssubmodul di dalam fungsi. - Setiap file, modul, atau objek yang dirujuk dalam fungsi harus dapat diserialisasikan dan tersedia di Spark.
Menggunakan kembali sumber data batch yang ada
Menggunakan foreachBatch(), Anda dapat menggunakan penulis data batch yang ada untuk data sink yang mungkin tidak memiliki dukungan Structured Streaming. Berikut beberapa contohnya:
Banyak sumber data batch lainnya dapat digunakan dari foreachBatch(). Lihat Menyambungkan ke sumber data dan layanan eksternal.
Tulis ke beberapa lokasi
Jika Anda perlu menulis output kueri streaming ke beberapa lokasi, Databricks merekomendasikan penggunaan beberapa penulis Streaming Terstruktur untuk paralelisasi dan throughput terbaik.
Menggunakan foreachBatch untuk menulis ke beberapa penampung melakukan serialisasi tulisan streaming, yang dapat menambah latensi setiap mikro-batch.
Jika Anda menggunakan foreachBatch untuk menulis ke beberapa tabel Delta, lihat Menggunakan foreachBatch untuk penulisan tabel idempotensi.
Mengonsumsi setiap batch DataFrame sepenuhnya
Saat Anda menggunakan operator stateful (misalnya, menggunakan dropDuplicatesWithinWatermark), setiap iterasi batch harus menggunakan seluruh DataFrame atau memulai ulang kueri. Jika Anda tidak menggunakan seluruh DataFrame, kueri streaming akan gagal dengan batch berikutnya.
Ini dapat terjadi dalam beberapa kasus. Contoh berikut menunjukkan cara memperbaiki kueri yang tidak menggunakan DataFrame dengan benar.
Sengaja menggunakan subset dari kelompok data.
Jika Anda hanya peduli tentang subset batch, Anda bisa memiliki kode seperti berikut.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def partial_func(batch_df, batch_id):
batch_df.show(2)
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Dalam hal ini, batch_df.show(2) hanya menangani dua item pertama dalam batch, yang diharapkan, tetapi jika ada lebih banyak item, item-item tersebut harus dikonsumsi. Kode berikut menggunakan DataFrame lengkap.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
batch_df.show(2)
batch_df.foreach(do_nothing) # silently consume the rest of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Di sini, fungsi do_nothing secara diam-diam mengabaikan sisa DataFrame.
Menangani kesalahan dalam pemrosesan batch
Untuk penanganan kesalahan di foreachBatch, Databricks merekomendasikan agar Anda membiarkan kueri streaming mengalami kegagalan cepat dan sebaliknya mengandalkan lapisan orkestrasi, seperti Pekerjaan Lakeflow atau Apache Airflow, untuk mengelola logika mencoba ulang. Ini jauh lebih aman daripada membangun perulangan coba lagi yang kompleks dalam kode Anda, di mana kehilangan data dapat terjadi.
Berikut adalah panduan berdasarkan target tulis Anda:
| Target | Examples | Bimbingan |
|---|---|---|
| Operasi DataFrame | Tabel Delta Lake | Anda harus menggunakan opsi txnAppId dan opsi tulis txnVersion, dengan mengikat txnVersion ke batchId, untuk menjamin idempotensi dan melindungi kebenaran data pada percobaan ulang. Jangan menangkap dan mencoba kembali pengecualian secara lokal. Sebaliknya, Databricks merekomendasikan agar Anda mengizinkan kesalahan berlangsung agar metrik Spark tetap akurat, data tidak menduplikasi, dan orkestrator dapat dengan lancar mencoba kembali batch lengkap. |
| Kode kustom dan tujuan eksternal |
.collect(), database OLTP, antrean pesan, API |
Terapkan idempotensi Anda sendiri. Anda harus berasumsi bahwa setiap operasi dapat dan akan dicoba kembali di seluruh batch. Jika batchId tetap sama, maka hasil operasi Anda harus tetap sama. Anda dapat mencoba lagi kesalahan yang benar-benar sementara seperti pemutusan koneksi singkat, tetapi berhati-hatilah untuk menghindari penulisan parsial atau duplikat jika percobaan ulang akhirnya gagal. Pendekatan paling aman adalah membiarkan kesalahan menyebar dan memungkinkan orkestrator untuk mencoba kembali seluruh batch. |
Berikut adalah beberapa contoh jenis pengecualian dan rekomendasi tentang cara menanganinya di foreachBatch:
| Jenis pengecualian | Examples | Tindakan yang disarankan |
|---|---|---|
| Kesalahan pengumpulan sementara |
SQLTransientConnectionException, HTTP 429, batas waktu |
Tangkap: coba lagi, atau kirim ke antrean surat mati |
| Pelanggaran batasan duplikat atau kunci saat sink idempogen | SQLIntegrityConstraintViolationException |
Tangkap: catat dan sembunyikan |
| Kesalahan kustom yang dapat diulang | Pengecualian soket terbungkus, kesalahan database yang dapat diulang kembali | Tangkap: menaikkan metrik dan memungkinkan kelanjutan terkontrol |
| Kesalahan logika atau skema |
NullPointerException, AttributeError, ketidakcocokan skema |
Menyebarluaskan: biarkan Spark gagal kueri |
| Kesalahan sink yang tidak dapat diulang atau bug logika yang tidak tertangkap |
ValueError, PermissionError |
Menyebarluaskan: biarkan Spark gagal kueri |
| Kegagalan kritis |
OutOfMemoryError, status rusak, pelanggaran integritas data |
Menyebarluaskan: biarkan Spark gagal kueri |
Contoh kode: penanganan pengecualian
Contoh berikut sengaja menimbulkan kesalahan foreach untuk menunjukkan pendekatan yang berbeda untuk menangani kesalahan:
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()
Kode di atas menangani dan secara diam-diam menekan kesalahan, dan mungkin tidak mengonsumsi sisa batch. Ada dua opsi untuk menangani situasi ini.
Pertama, Anda dapat mengangkat kembali kesalahan tersebut, yang akan meneruskannya ke lapisan orkestrasi Anda untuk melakukan pengulangan batch. Ini dapat menyelesaikan kesalahan, jika itu adalah masalah sementara, atau meningkatkannya bagi tim operasi Anda untuk mencoba memperbaiki secara manual. Untuk melakukan ini, ubah partial_func kode agar terlihat seperti ini:
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
raise e # re-raise the issue
Kedua, jika Anda ingin menangkap pengecualian dan mengabaikan sisa batch, Anda dapat mengubah kode untuk menggunakan do_nothing fungsi untuk mengabaikan sisa batch secara diam-diam.
from pyspark.sql.functions import expr
stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()
def foreach_func(row):
# handle the row, but in this case, for the sample, will just raise an error:
raise Exception('error')
# function to do nothing with a row
def do_nothing(row):
pass
def partial_func(batch_df, batch_id):
try:
batch_df.foreach(foreach_func)
except Exception as e:
print(e) # or whatever error handling you want to have
batch_df.foreach(do_nothing) # silently consume the remainder of the batch
q = streamWithWatermark.writeStream \
.foreachBatch(partial_func) \
.option("checkpointLocation", checkpoint_dir) \
.trigger(processingTime='2 seconds') \
.start()