Zaman uyumsuz ilerleme izleme

Zaman uyumsuz ilerleme izleme, sorguların denetim noktası ilerleme durumunu zaman uyumsuz olarak güncelleştirmesini ve her mikro toplu işlemdeki verileri işlemesini sağlayarak Yapılandırılmış Akış işlem hatlarının gecikme süresini azaltır.

Sorgu işleme sırasında Yapılandırılmış Akış, her mikro toplu işlemde offsetLog ve commitLog içindeki sorgu ilerleme durumunu ölçmek için offsetleri kalıcı hale getirir ve yönetir. Eşzamansız ilerleme takibi olmadan, kaydırma yönetim işlemleri doğrudan işleme gecikmesini etkiler çünkü tamamlanmadan veri işlemi devam edemez.

Zaman Uyumsuz İlerleme İzleme

Not

Zaman uyumsuz ilerleme izleme, Trigger.once veya Trigger.availableNow tetikleyicilerle uyumlu değildir. Eğer etkinleştirilirse, Trigger.once veya Trigger.availableNow ile Yapılandırılmış Akış sorguları başarısız olur.

Yapılandırma seçenekleri

Seçenek Varsayılan Açıklama
asyncProgressTrackingEnabled false Zaman uyumsuz ilerleme izlemenin etkinleştirilip etkinleştirilmeyileceği.
asyncProgressTrackingCheckpointIntervalMs 1000 Kaydırmalar ve tamamlama işlemeleri için yazma işlemleri arasındaki milisaniye cinsinden aralık.

Zaman uyumsuz ilerleme izlemeyi etkinleştirme

Asenkron ilerleme izlemeyi etkinleştirmek için asyncProgressTrackingEnabled değerini true olarak ayarlayın.

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Denetim noktası sıklığı ile aktarım hızını geliştirme

Milisaniyelik 1000 varsayılan denetim noktası sıklığı çoğu sorgu için iyi aktarım hızına sahiptir. Öteleme yönetimi işlemleri, zaman uyumsuz ilerleme izleme işlemlerinin işleyebileceğinden daha hızlı gerçekleştiğinde, bir yığılım oluşur. İş yükünün daha fazla büyümesini önlemek için, zaman uyumsuz ilerleme takibi veri işlemeyi engelleyebilir veya yavaşlatabilir ve bu durum potansiyel olarak beklenen gecikme süresi avantajlarını aşındırabilir.

Bu senaryoda Databricks, denetim noktası aralığını artırmanızı önerir:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Not

Hata kurtarma süresi denetim noktası aralığı süresiyle artar. Hata durumunda, bir işlem hattının önceki başarılı denetim noktasından bu yana tüm verileri yeniden işlemesi gerekir. Üretimde bu değişikliği gerçekleştirmeden önce, hata durumunda kurtarma süresiyle karşılaştırıldığında normal işleme sırasında daha düşük gecikme süresi arasındaki dengeyi göz önünde bulundurun.

Zaman uyumsuz ilerleme izlemeyi kapatma

Zaman uyumsuz ilerleme izleme etkinleştirildiğinde, akış her toplu işlem için denetim noktası ilerleme durumunu garanti etmez. Bu özelliği kapatabilmeniz için önce denetim noktası ilerleme durumunu denetlemeniz gerekir.

Kapatmak için şu adımları izleyin:

  1. asyncProgressTrackingEnabled true olarak ayarlanmış ve asyncProgressTrackingCheckpointIntervalMs0 olarak ayarlanmış en az iki mikro toplu işlem işleyin:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Sorguyu durdurun:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Zaman uyumsuz ilerleme izlemeyi kapatın ve sorguyu yeniden başlatın:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Yukarıdaki adımları izlemeden zaman uyumsuz ilerleme durumunu izlemeyi kapatırsanız aşağıdaki hatayla karşılaşabilirsiniz:

java.lang.IllegalStateException: batch x doesn't exist

Sürücü günlüklerinde aşağıdaki hatayı görebilirsiniz:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Sınırlamalar

  • Kafka havuzları için zaman uyumsuz ilerleme izleme yalnızca durum bilgisi olmayan işlem hatlarını destekler.
  • Ofset aralıkları bir veri topluluğu için hata durumunda değişebileceğinden, eşzamansız ilerleme takibi tam olarak bir defa uçtan uca işlemeyi garanti etmez. Kafka gibi bazı havuzlar hiçbir zaman tam olarak bir kez garanti vermez.