Aszinkron folyamatkövetés

Az aszinkron folyamatkövetés csökkenti a strukturált streamelési folyamatok késését azáltal, hogy lehetővé teszi, hogy a lekérdezések aszinkron módon frissítsék az ellenőrzőpont állapotát és feldolgozzák az adatokat az egyes mikrokötegekben.

A lekérdezésfeldolgozás során a strukturált streamelés megőrzi és kezeli az eltolásokat az egyes mikrokötegek lekérdezési offsetLog folyamatának commitLog méréséhez. Az aszinkron folyamatkövetés nélkül az eltoláskezelési műveletek közvetlenül befolyásolják a feldolgozási késést, mert az adatfeldolgozás nem folytatható, amíg el nem fejeződnek.

aszinkron folyamatkövetési

Jegyzet

Az aszinkron folyamatkövetés nem kompatibilis a Trigger.once vagy Trigger.availableNow eseményindítókkal. Ha engedélyezve van, a strukturált streamelési lekérdezések vagy Trigger.once vagy Trigger.availableNow esetén sikertelenek.

Konfigurációs beállítások

Opció Alapértelmezett Leírás
asyncProgressTrackingEnabled false Az aszinkron folyamatkövetés engedélyezése.
asyncProgressTrackingCheckpointIntervalMs 1000 Az eltolások és a befejezési véglegesítések írása közötti ezredmásodpercben megadott intervallum.

Aszinkron folyamatkövetés engedélyezése

Az aszinkron folyamatkövetés engedélyezéséhez állítsa a asyncProgressTrackingEnabled következőre true:

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Az átviteli sebesség javítása ellenőrzőpont gyakoriságával

Az alapértelmezett 1000 milliszekundumos ellenőrzőpont-gyakoriság jó átviteli sebességet biztosít a legtöbb lekérdezéshez. Ha az eltoláskezelési műveletek gyorsabban történnek, mint az aszinkron folyamatkövetés feldolgozhatja őket, az eltoláskezelési műveletek hátraléka jön létre. A hátralék további növekedésének megakadályozása érdekében az aszinkron folyamatkövetés blokkolhatja vagy lelassíthatja az adatfeldolgozást, ami ronthatja a várt késési előnyöket.

Ebben a forgatókönyvben a Databricks az ellenőrzőpont-időköz növelését javasolja:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Jegyzet

A hibahelyreállítási idő nő az ellenőrzőpont időközével. Hiba esetén a folyamatnak újra kell feldolgoznia az összes adatot az előző sikeres ellenőrzőpont óta. Mielőtt elvégezené ezt a változást az éles környezetben, vegye figyelembe a normál feldolgozás során fellépő kisebb késés és a meghibásodás esetén a helyreállítási idő közötti kompromisszumot.

Az aszinkron folyamatkövetés kikapcsolása

Ha az aszinkron folyamatkövetés engedélyezve van, a stream nem garantálja az ellenőrzőpont előrehaladását minden köteg esetében. Mielőtt kikapcsolhatja ezt a funkciót, mentenie kell a haladást.

A kikapcsoláshoz kövesse az alábbi lépéseket:

  1. Legalább két mikrotétel feldolgozása, ahol asyncProgressTrackingEnabled a true értékre, asyncProgressTrackingCheckpointIntervalMs pedig a 0 értékre van állítva.

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Állítsa le a lekérdezést:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Kapcsolja ki az aszinkron folyamatkövetést, és indítsa újra a lekérdezést:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Ha a fenti lépések végrehajtása nélkül kapcsolja ki az aszinkron folyamatkövetést, a következő hibaüzenet jelenhet meg:

java.lang.IllegalStateException: batch x doesn't exist

Az illesztőprogram-naplókban a következő hiba jelenhet meg:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Limitations

  • A Kafka-fogadók esetében az aszinkron folyamatkövetés csak az állapot nélküli csővezetékeket támogatja.
  • Az aszinkron folyamatkövetés nem garantálja a pontos, végponttól végpontig tartó feldolgozást, mivel a köteg eltolási tartományai hiba esetén változhatnak. Egyes adatátviteli végpontok, például a Kafka, nem biztosítanak pontosan egyszeri garanciát.