Bagikan melalui


Apa itu pelacakan kemajuan asinkron?

Pelacakan kemajuan asinkron memungkinkan pipeline Streaming Terstruktur untuk mencatat kemajuan secara asinkron dan paralel dengan pemrosesan data aktual dalam mikro-batch, mengurangi latensi yang terkait dengan pemeliharaan offsetLog dan commitLog.

Pelacakan Kemajuan Asinkron

Nota

Pelacakan kemajuan asinkron tidak berfungsi dengan pemicu Trigger.once atau Trigger.availableNow. Mencoba mengaktifkan fitur ini dengan pemicu tersebut mengakibatkan kegagalan kueri.

Bagaimana cara kerja pelacakan kemajuan asinkron untuk mengurangi latensi?

Structured Streaming mengandalkan mempertahankan dan mengelola offset sebagai indikator kemajuan untuk pemrosesan kueri. Operasi manajemen offset secara langsung berdampak pada latensi pemrosesan, karena tidak ada pemrosesan data yang dapat terjadi sampai operasi ini selesai. Pelacakan kemajuan asinkron memungkinkan alur Streaming Terstruktur untuk kemajuan titik pemeriksaan tanpa terpengaruh oleh operasi manajemen offset ini.

Kapan Anda harus mengonfigurasi frekuensi titik pemeriksaan?

Pengguna dapat mengonfigurasi frekuensi pencatatan kemajuan. Pengaturan default untuk frekuensi titik pemeriksaan memberikan throughput yang baik untuk sebagian besar kueri. Mengonfigurasi frekuensi sangat membantu untuk skenario di mana operasi manajemen offset terjadi pada tingkat yang lebih tinggi daripada yang dapat diproses, yang menciptakan backlog operasi manajemen offset yang terus meningkat. Untuk membendung backlog yang berkembang ini, pemrosesan data diblokir atau diperlambat, pada dasarnya mengembalikan perilaku pemrosesan untuk menghilangkan manfaat pelacakan kemajuan asinkron.

Nota

Durasi pemulihan kegagalan meningkat seiring dengan bertambahnya durasi interval titik pemeriksaan. Jika terjadi kegagalan, alur pemrosesan harus memproses ulang semua data sebelum pos pemeriksaan yang berhasil sebelumnya. Pengguna dapat mempertimbangkan pertukaran ini antara latensi yang lebih rendah selama pemrosesan reguler dan waktu pemulihan jika terjadi kegagalan.

Konfigurasi apa yang terkait dengan pelacakan kemajuan asinkron?

Pilihan Nilai Bawaan Deskripsi
PelacakanKemajuanAsinkronDiaktifkan benar/salah palsu mengaktifkan atau menonaktifkan pelacakan kemajuan asinkron
asyncProgressTrackingCheckpointIntervalMs Milidetik 1000 interval waktu ketika kami mengkomit offset dan komitmen penyelesaian

Bagaimana pengguna dapat mengaktifkan pelacakan kemajuan asinkron?

Pengguna dapat menggunakan kode yang mirip dengan kode di bawah ini untuk mengaktifkan fitur ini:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Menonaktifkan pelacakan kemajuan asinkron

Ketika pelacakan kemajuan asinkron diaktifkan, kerangka kerja tidak memeriksa kemajuan untuk setiap batch. Untuk mengatasi hal ini, sebelum Anda menonaktifkan pelacakan kemajuan asinkron, proses setidaknya dua batch mikro dengan pengaturan berikut:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Hentikan kueri setelah setidaknya dua batch mikro selesai diproses. Sekarang Anda dapat menonaktifkan pelacakan kemajuan asinkron dengan aman dan memulai ulang kueri.

Jika Anda telah menonaktifkan pelacakan kemajuan asinkron tanpa menyelesaikan langkah ini, Anda mungkin mengalami kesalahan berikut:

java.lang.IllegalStateException: batch x doesn't exist

Di log driver, Anda mungkin melihat kesalahan berikut:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Mengikuti instruksi di bagian ini untuk menonaktifkan pelacakan kemajuan asinkron memungkinkan Anda mengatasi kesalahan ini dan memperbaiki beban kerja streaming Anda.

Batasan dengan pelacakan kemajuan asinkron

Fitur ini memiliki batasan berikut:

  • Pelacakan kemajuan asinkron hanya didukung dalam pipeline bebas status saat menggunakan Kafka sebagai sink.
  • Pemrosesan end-to-end yang dilakukan tepat satu kali tidak dijamin dengan pelacakan kemajuan secara asinkron karena rentang offset untuk batch dapat diubah jika terjadi kegagalan. Beberapa sink, seperti Kafka, tidak pernah memberikan jaminan sekali persis.