Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Pelacakan kemajuan asinkron mengurangi latensi untuk alur Streaming Terstruktur dengan mengaktifkan kueri untuk memperbarui kemajuan titik pemeriksaan secara asinkron dan memproses data di setiap mikro-batch.
Selama pemrosesan kueri, Streaming Terstruktur menyimpan dan mengelola offset untuk mengukur kemajuan kueri di offsetLog dan commitLog di setiap batch mikro. Tanpa pelacakan kemajuan asinkron, operasi manajemen offset secara langsung memengaruhi latensi pemrosesan karena pemrosesan data tidak dapat dilanjutkan hingga selesai.
Pelacakan Kemajuan Asinkron
Nota
Pelacakan kemajuan asinkron tidak kompatibel dengan Trigger.once atau Trigger.availableNow pemicu. Jika diaktifkan, Kueri Streaming Terstruktur dengan Trigger.once atau Trigger.availableNow gagal.
Opsi konfigurasi
| Pilihan | Bawaan | Deskripsi |
|---|---|---|
asyncProgressTrackingEnabled |
false |
Apakah akan mengaktifkan pelacakan kemajuan asinkron. |
asyncProgressTrackingCheckpointIntervalMs |
1000 |
Interval dalam milidetik antara penulisan untuk offset dan komit penyelesaian. |
Mengaktifkan pelacakan kemajuan asinkron
Untuk mengaktifkan pelacakan kemajuan asinkron, atur asyncProgressTrackingEnabled ke true:
Python
stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
)
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
)
Scala
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Meningkatkan throughput dengan frekuensi titik pemeriksaan
Frekuensi pemeriksaan default 1000 milidetik menghasilkan throughput yang baik untuk sebagian besar kueri. Ketika operasi manajemen offset terjadi lebih cepat daripada pelacakan kemajuan asinkron dapat memprosesnya, terjadi penumpukan operasi manajemen offset. Untuk mencegah backlog tumbuh lebih jauh, pelacakan kemajuan asinkron dapat memblokir atau memperlambat pemrosesan data, berpotensi mengikis manfaat latensi yang diharapkan.
Dalam skenario ini, Databricks merekomendasikan agar Anda meningkatkan interval titik pemeriksaan:
Python
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
)
Scala
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
Nota
Waktu pemulihan kegagalan meningkat seiring dengan interval waktu pos pemeriksaan. Jika terjadi kegagalan, alur harus memproses ulang semua data sejak titik pemeriksaan yang berhasil sebelumnya. Sebelum Anda membuat perubahan dalam produksi ini, pertimbangkan trade-off antara latensi yang lebih rendah selama pemrosesan reguler dibandingkan dengan waktu pemulihan jika terjadi kegagalan.
Menonaktifkan pelacakan kemajuan asinkron
Ketika pelacakan kemajuan asinkron diaktifkan, aliran tidak menjamin kemajuan titik pemeriksaan untuk setiap batch. Anda harus mencatat kemajuan sebelum dapat menonaktifkan fitur ini.
Untuk menonaktifkan, ikuti langkah-langkah berikut:
Proses setidaknya dua batch mikro dengan
asyncProgressTrackingEnabledatur ketruedanasyncProgressTrackingCheckpointIntervalMsatur ke0:Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start()Hentikan kueri:
Python
query.stop()Scala
query.stop()Nonaktifkan pelacakan kemajuan asinkron dan mulai ulang kueri:
Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start()
Jika Anda menonaktifkan pelacakan kemajuan asinkron tanpa mengikuti langkah-langkah di atas, Anda mungkin mengalami kesalahan berikut:
java.lang.IllegalStateException: batch x doesn't exist
Di log driver, Anda mungkin melihat kesalahan berikut:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Keterbatasan
- Untuk sink Kafka, pelacakan kemajuan secara asinkron hanya mendukung pipeline tanpa status.
- Pelacakan kemajuan asinkron tidak menjamin pemrosesan end-to-end hanya sekali karena rentang offset untuk batch dapat berubah ketika terjadi kegagalan. Beberapa sink, seperti Kafka, tidak pernah memberikan jaminan sekali persis.