Megosztás a következőn keresztül:


Mi az aszinkron folyamatkövetés?

Az aszinkron folyamatkövetés lehetővé teszi, hogy a strukturált streamelési folyamatok aszinkron módon és párhuzamosan ellenőrizzék az előrehaladást a mikrokötegen belüli tényleges adatfeldolgozással, csökkentve a és commitLoga fenntartásához kapcsolódó késéstoffsetLog.

Aszinkron folyamatkövetés

Megjegyzés

Az aszinkron folyamatkövetés nem működik a vagy Trigger.availableNow az eseményindítókkalTrigger.once. Ha ezekkel az eseményindítókkal megkísérli engedélyezni ezt a funkciót, lekérdezési hiba jelenik meg.

Hogyan működik az aszinkron folyamatkövetés a késés csökkentése érdekében?

A strukturált streamelés az eltolások megőrzésére és kezelésére támaszkodik a lekérdezésfeldolgozás folyamatjelzőjeként. Az eltoláskezelési művelet közvetlenül befolyásolja a feldolgozási késést, mivel a műveletek befejezéséig nem történhet adatfeldolgozás. Az aszinkron folyamatkövetés lehetővé teszi, hogy a strukturált streamelési folyamatok az eltoláskezelési műveletek befolyásolása nélkül ellenőrizzék a folyamat előrehaladását.

Mikor érdemes beállítani az ellenőrzőpont gyakoriságát?

A felhasználók konfigurálhatják az állapot ellenőrzésének gyakoriságát. Az ellenőrzőpont-gyakoriság alapértelmezett beállításai jó átviteli sebességet biztosítanak a legtöbb lekérdezéshez. A gyakoriság konfigurálása olyan forgatókönyvek esetében hasznos, amelyekben az eltoláskezelési műveletek a feldolgozhatónál magasabb sebességgel történnek, ami egyre nagyobb hátralékot eredményez az eltoláskezelési műveleteknél. A növekvő teendőlista elkerülése érdekében az adatfeldolgozás le van tiltva vagy lelassul, lényegében visszaállítva a feldolgozási viselkedést az aszinkron folyamatkövetés előnyeinek kiküszöbölése érdekében.

Megjegyzés

A sikertelen helyreállítás ideje az ellenőrzőpont-intervallum időtartamának növekedésével nő. Hiba esetén a folyamatnak újra fel kell dolgoznia az összes adatot az előző sikeres ellenőrzőpont előtt. A felhasználók megfontolhatják ezt a kompromisszumot a normál feldolgozás során jelentkező kisebb késés és meghibásodás esetén a helyreállítási idő között.

Milyen konfigurációk vannak társítva az aszinkron folyamatkövetéshez?

Beállítás Value Alapértelmezett Leírás
asyncProgressTrackingEnabled Igaz/hamis false aszinkron folyamatkövetés engedélyezése vagy letiltása
asyncProgressTrackingCheckpointIntervalMs ezredmásodperc 1000 az eltolások és befejezési véglegesítések véglegesítési időköze

Hogyan engedélyezhetik a felhasználók az aszinkron folyamatkövetést?

A felhasználók az alábbi kódhoz hasonló kódot használhatnak a funkció engedélyezéséhez:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Az aszinkron folyamatkövetés kikapcsolása

Ha az aszinkron folyamatkövetés engedélyezve van, a keretrendszer nem ellenőrzi az egyes kötegek állapotát. Ennek megoldásához az aszinkron folyamatkövetés letiltása előtt dolgozzon fel legalább két mikroköteget a következő beállításokkal:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Állítsa le a lekérdezést, miután legalább két mikroköteg feldolgozása befejeződött. Most már biztonságosan letilthatja az aszinkron folyamatkövetést, és újraindíthatja a lekérdezést.

Ha a lépés végrehajtása nélkül letiltotta az aszinkron folyamatkövetést, a következő hibaüzenet jelenhet meg:

java.lang.IllegalStateException: batch x doesn't exist

Az illesztőnaplókban a következő hibaüzenet jelenhet meg:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Az aszinkron folyamatkövetés letiltására vonatkozó szakasz utasításait követve megoldhatja ezeket a hibákat, és kijavíthatja a streamelési számítási feladatot.

Az aszinkron folyamatkövetés korlátozásai

Ez a funkció a következő korlátozásokkal rendelkezik:

  • Az aszinkron folyamatkövetés csak állapot nélküli folyamatokban támogatott, ha a Kafkát fogadóként használja.
  • A végpontok közötti feldolgozás nem garantált az aszinkron folyamatkövetéssel, mivel a köteg eltolási tartományai hiba esetén módosíthatók. Egyes fogadók, mint például a Kafka, soha nem biztosítanak pontosan egyszeri garanciákat.