Gunakan foreachBatch untuk menulis ke sink data arbitrer
Artikel ini membahas penggunaan foreachBatch
dengan Streaming Terstruktur untuk menulis output kueri streaming ke sumber data yang tidak memiliki sink streaming yang ada.
Pola streamingDF.writeStream.foreachBatch(...)
kode memungkinkan Anda menerapkan fungsi batch ke data output dari setiap batch mikro kueri streaming. Fungsi yang digunakan dengan foreachBatch
mengambil dua parameter:
- DataFrame yang memiliki data output mikro-batch.
- ID unik dari mikro-batch.
Anda harus menggunakan foreachBatch
untuk operasi penggabungan Delta Lake di Streaming Terstruktur. Lihat Upsert dari kueri streaming menggunakan foreachBatch.
Menerapkan operasi DataFrame tambahan
Banyak operasi DataFrame dan Dataset tidak didukung dalam streaming DataFrames karena Spark tidak mendukung pembuatan paket bertahap dalam kasus tersebut. Dengan menggunakan foreachBatch()
Anda dapat menerapkan beberapa operasi ini pada setiap output batch mikro. Misalnya, Anda dapat menggunakan foreachBath()
dan operasi SQL MERGE INTO
untuk menulis output agregasi streaming ke dalam tabel Delta dalam mode pembaruan. Lihat detail selengkapnya di MERGE INTO.
Penting
foreachBatch()
hanya memberikan jaminan menulis setidaknya sekali. Namun, Anda dapat menggunakanbatchId
yang disediakan untuk fungsi sebagai cara untuk menghilangkan duplikasi output dan mendapatkan jaminan yang tepat sekali. Dalam kedua kasus, Anda harus memiliki alasan sendiri tentang semantik secara menyeluruh.foreachBatch()
tidak berfungsi dengan mode pemrosesan kontinu karena pada dasarnya bergantung pada eksekusi batch mikro dari kueri streaming. Jika Anda menulis data dalam mode kontinu, gunakanforeach()
sebagai gantinya.
Kerangka data kosong dapat dipanggil dengan foreachBatch()
dan kode pengguna harus tangguh untuk memungkinkan operasi yang tepat. Contoh ditunjukkan di sini:
.foreachBatch(
(outputDf: DataFrame, bid: Long) => {
// Process valid data frames only
if (!outputDf.isEmpty) {
// business logic
}
}
).start()
Perubahan perilaku untuk foreachBatch
di Databricks Runtime 14.0
Dalam Databricks Runtime 14.0 ke atas pada komputasi yang dikonfigurasi dengan mode akses bersama, forEachBatch
berjalan dalam proses Python terisolasi terpisah pada Apache Spark, bukan di lingkungan REPL. Ini diserialisasikan dan didorong ke Spark dan tidak memiliki akses ke objek global spark
selama durasi sesi.
Dalam semua konfigurasi komputasi lainnya, foreachBatch
berjalan di Python REPL yang sama yang menjalankan sisa kode Anda. Akibatnya, fungsi tidak diserialisasikan.
Saat Anda menggunakan Databricks Runtime 14.0 ke atas pada komputasi yang dikonfigurasi dengan mode akses bersama, Anda harus menggunakan variabel yang sparkSession
dilingkupkan ke DataFrame lokal saat menggunakan foreachBatch
di Python, seperti dalam contoh kode berikut:
def example_function(df, batch_id):
df.sparkSession.sql("<query>")
Perubahan perilaku berikut berlaku:
- Anda tidak dapat mengakses variabel Python global apa pun dari dalam fungsi Anda.
print()
perintah menulis output ke log driver.- Setiap file, modul, atau objek yang dirujuk dalam fungsi harus dapat diserialisasikan dan tersedia di Spark.
Menggunakan kembali sumber data batch yang ada
Menggunakan foreachBatch()
, Anda dapat menggunakan penulis data batch yang ada untuk sink data yang mungkin tidak memiliki dukungan Streaming Terstruktur. Berikut beberapa contohnya:
Banyak sumber data batch lainnya dapat digunakan dari foreachBatch()
. Lihat Koneksi ke sumber data.
Tulis ke beberapa lokasi
Jika Anda perlu menulis output kueri streaming ke beberapa lokasi, Databricks merekomendasikan penggunaan beberapa penulis Streaming Terstruktur untuk paralelisasi dan throughput terbaik.
Menggunakan foreachBatch
untuk menulis ke beberapa sink membuat serialisasi eksekusi penulisan streaming, yang dapat meningkatkan latensi untuk setiap mikro-batch.
Jika Anda menggunakan foreachBatch
untuk menulis ke beberapa tabel Delta, lihat Penulisan tabel Idempotent di foreachBatch.
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk