Pelacakan kemajuan asinkron

Pelacakan kemajuan asinkron mengurangi latensi untuk alur Streaming Terstruktur dengan mengaktifkan kueri untuk memperbarui kemajuan titik pemeriksaan secara asinkron dan memproses data di setiap mikro-batch.

Selama pemrosesan kueri, Streaming Terstruktur menyimpan dan mengelola offset untuk mengukur kemajuan kueri di offsetLog dan commitLog di setiap batch mikro. Tanpa pelacakan kemajuan asinkron, operasi manajemen offset secara langsung memengaruhi latensi pemrosesan karena pemrosesan data tidak dapat dilanjutkan hingga selesai.

Pelacakan Kemajuan Asinkron

Nota

Pelacakan kemajuan asinkron tidak kompatibel dengan Trigger.once atau Trigger.availableNow pemicu. Jika diaktifkan, Kueri Streaming Terstruktur dengan Trigger.once atau Trigger.availableNow gagal.

Opsi konfigurasi

Pilihan Bawaan Deskripsi
asyncProgressTrackingEnabled false Apakah akan mengaktifkan pelacakan kemajuan asinkron.
asyncProgressTrackingCheckpointIntervalMs 1000 Interval dalam milidetik antara penulisan untuk offset dan komit penyelesaian.

Mengaktifkan pelacakan kemajuan asinkron

Untuk mengaktifkan pelacakan kemajuan asinkron, atur asyncProgressTrackingEnabled ke true:

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Meningkatkan throughput dengan frekuensi titik pemeriksaan

Frekuensi pemeriksaan default 1000 milidetik menghasilkan throughput yang baik untuk sebagian besar kueri. Ketika operasi manajemen offset terjadi lebih cepat daripada pelacakan kemajuan asinkron dapat memprosesnya, terjadi penumpukan operasi manajemen offset. Untuk mencegah backlog tumbuh lebih jauh, pelacakan kemajuan asinkron dapat memblokir atau memperlambat pemrosesan data, berpotensi mengikis manfaat latensi yang diharapkan.

Dalam skenario ini, Databricks merekomendasikan agar Anda meningkatkan interval titik pemeriksaan:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Nota

Waktu pemulihan kegagalan meningkat seiring dengan interval waktu pos pemeriksaan. Jika terjadi kegagalan, alur harus memproses ulang semua data sejak titik pemeriksaan yang berhasil sebelumnya. Sebelum Anda membuat perubahan dalam produksi ini, pertimbangkan trade-off antara latensi yang lebih rendah selama pemrosesan reguler dibandingkan dengan waktu pemulihan jika terjadi kegagalan.

Menonaktifkan pelacakan kemajuan asinkron

Ketika pelacakan kemajuan asinkron diaktifkan, aliran tidak menjamin kemajuan titik pemeriksaan untuk setiap batch. Anda harus mencatat kemajuan sebelum dapat menonaktifkan fitur ini.

Untuk menonaktifkan, ikuti langkah-langkah berikut:

  1. Proses setidaknya dua batch mikro dengan asyncProgressTrackingEnabled atur ke true dan asyncProgressTrackingCheckpointIntervalMs atur ke 0:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Hentikan kueri:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Nonaktifkan pelacakan kemajuan asinkron dan mulai ulang kueri:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Jika Anda menonaktifkan pelacakan kemajuan asinkron tanpa mengikuti langkah-langkah di atas, Anda mungkin mengalami kesalahan berikut:

java.lang.IllegalStateException: batch x doesn't exist

Di log driver, Anda mungkin melihat kesalahan berikut:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Keterbatasan

  • Untuk sink Kafka, pelacakan kemajuan secara asinkron hanya mendukung pipeline tanpa status.
  • Pelacakan kemajuan asinkron tidak menjamin pemrosesan end-to-end hanya sekali karena rentang offset untuk batch dapat berubah ketika terjadi kegagalan. Beberapa sink, seperti Kafka, tidak pernah memberikan jaminan sekali persis.