Bagikan melalui


Gunakan foreachBatch untuk menulis ke sink data arbitrer

Artikel ini membahas penggunaan foreachBatch dengan Streaming Terstruktur untuk menulis output kueri streaming ke sumber data yang tidak memiliki sink streaming yang ada.

Pola streamingDF.writeStream.foreachBatch(...) kode memungkinkan Anda menerapkan fungsi batch ke data output dari setiap batch mikro kueri streaming. Fungsi yang digunakan dengan foreachBatch mengambil dua parameter:

  • DataFrame yang memiliki data output mikro-batch.
  • ID unik dari mikro-batch.

Anda harus menggunakan foreachBatch untuk operasi penggabungan Delta Lake di Streaming Terstruktur. Lihat Upsert dari kueri streaming menggunakan foreachBatch.

Menerapkan operasi DataFrame tambahan

Banyak operasi DataFrame dan Dataset tidak didukung dalam streaming DataFrames karena Spark tidak mendukung pembuatan paket bertahap dalam kasus tersebut. Dengan menggunakan foreachBatch() Anda dapat menerapkan beberapa operasi ini pada setiap output batch mikro. Misalnya, Anda dapat menggunakan foreachBatch() dan operasi SQL MERGE INTO untuk menulis output agregasi streaming ke dalam tabel Delta dalam mode pembaruan. Lihat detail selengkapnya di MERGE INTO.

Penting

  • foreachBatch() hanya memberikan jaminan menulis setidaknya sekali. Namun, Anda dapat menggunakan batchId yang disediakan untuk fungsi sebagai metode untuk menghilangkan duplikasi output dan mendapatkan jaminan tepat sekali. Dalam kedua kasus, Anda harus memiliki alasan sendiri tentang semantik secara menyeluruh.
  • foreachBatch() tidak berfungsi dengan mode pemrosesan kontinu karena pada dasarnya bergantung pada eksekusi batch mikro dari kueri streaming. Jika Anda menulis data dalam mode kontinu, gunakan foreach() sebagai gantinya.
  • Saat menggunakan foreachBatch dengan operator stateful, penting untuk sepenuhnya mengonsumsi setiap batch sebelum pemrosesan selesai. Lihat Mengonsumsi setiap DataFrame batch sepenuhnya

Kerangka data kosong dapat dipanggil dengan foreachBatch() dan kode pengguna harus tangguh untuk memungkinkan operasi yang tepat. Contoh ditunjukkan di sini:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Perubahan perilaku untuk foreachBatch di Databricks Runtime 14.0

Dalam Databricks Runtime 14.0 ke atas pada komputasi yang dikonfigurasi dengan mode akses standar, perubahan perilaku berikut berlaku:

  • print() perintah menulis output ke log driver.
  • Anda tidak dapat mengakses dbutils.widgets submodul di dalam fungsi.
  • Setiap file, modul, atau objek yang dirujuk dalam fungsi harus dapat diserialisasikan dan tersedia di Spark.

Menggunakan kembali sumber data batch yang ada

Menggunakan foreachBatch(), Anda dapat menggunakan penulis data batch yang ada untuk sink data yang mungkin tidak memiliki dukungan Streaming Terstruktur. Berikut beberapa contohnya:

Banyak sumber data batch lainnya dapat digunakan dari foreachBatch(). Lihat Menyambungkan ke sumber data dan layanan eksternal.

Tulis ke beberapa lokasi

Jika Anda perlu menulis output kueri streaming ke beberapa lokasi, Databricks merekomendasikan penggunaan beberapa penulis Streaming Terstruktur untuk paralelisasi dan throughput terbaik.

Menggunakan foreachBatch untuk menulis ke beberapa sink membuat serialisasi eksekusi penulisan streaming, yang dapat meningkatkan latensi untuk setiap mikro-batch.

Jika Anda menggunakan foreachBatch untuk menulis ke beberapa tabel Delta, lihat Penulisan tabel Idempotent di foreachBatch.

Mengonsumsi setiap batch DataFrame sepenuhnya

Saat Anda menggunakan operator stateful (misalnya, menggunakan dropDuplicatesWithinWatermark), setiap iterasi batch harus menggunakan seluruh DataFrame atau memulai ulang kueri. Jika Anda tidak menggunakan seluruh DataFrame, kueri streaming akan gagal dengan batch berikutnya.

Ini dapat terjadi dalam beberapa kasus. Contoh berikut menunjukkan cara memperbaiki kueri yang tidak menggunakan DataFrame dengan benar.

Sengaja menggunakan subset dari kelompok data.

Jika Anda hanya peduli tentang subset batch, Anda bisa memiliki kode seperti berikut.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Dalam hal ini, batch_df.show(2) hanya menangani dua item pertama dalam batch, yang diharapkan, tetapi jika ada lebih banyak item, item-item tersebut harus dikonsumsi. Kode berikut menggunakan DataFrame lengkap.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Di sini, fungsi do_nothing secara diam-diam mengabaikan sisa DataFrame.

Menangani kesalahan dalam pemrosesan batch

Mungkin ada kesalahan saat menjalankan foreachBatch proses. Anda dapat memiliki kode seperti berikut (dalam hal ini, sampel sengaja menimbulkan kesalahan untuk menunjukkan masalah).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Dengan menangani (dan menelan secara diam-diam) kesalahan, sisa batch mungkin tidak digunakan. Ada dua opsi untuk menangani situasi ini.

Pertama, Anda dapat mengirim ulang kesalahan, yang mengarahkannya ke lapisan orkestrasi Anda agar batch dapat diulang. Ini dapat menyelesaikan kesalahan, jika itu adalah masalah sementara, atau meningkatkannya bagi tim operasi Anda untuk mencoba memperbaiki secara manual. Untuk melakukan ini, ubah partial_func kode agar terlihat seperti ini:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

Opsi kedua, jika Anda ingin menangkap pengecualian dan mengabaikan sisa batch, adalah mengubah kode ke ini.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Kode ini menggunakan do_nothing fungsi untuk mengabaikan sisa batch secara diam-diam.