Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Az aszinkron folyamatkövetés csökkenti a strukturált streamelési folyamatok késését azáltal, hogy lehetővé teszi, hogy a lekérdezések aszinkron módon frissítsék az ellenőrzőpont állapotát és feldolgozzák az adatokat az egyes mikrokötegekben.
A lekérdezésfeldolgozás során a strukturált streamelés megőrzi és kezeli az eltolásokat az egyes mikrokötegek lekérdezési offsetLog folyamatának commitLog méréséhez. Az aszinkron folyamatkövetés nélkül az eltoláskezelési műveletek közvetlenül befolyásolják a feldolgozási késést, mert az adatfeldolgozás nem folytatható, amíg el nem fejeződnek.
Jegyzet
Az aszinkron folyamatkövetés nem kompatibilis a Trigger.once vagy Trigger.availableNow eseményindítókkal. Ha engedélyezve van, a strukturált streamelési lekérdezések vagy Trigger.once vagy Trigger.availableNow esetén sikertelenek.
Konfigurációs beállítások
| Opció | Alapértelmezett | Leírás |
|---|---|---|
asyncProgressTrackingEnabled |
false |
Az aszinkron folyamatkövetés engedélyezése. |
asyncProgressTrackingCheckpointIntervalMs |
1000 |
Az eltolások és a befejezési véglegesítések írása közötti ezredmásodpercben megadott intervallum. |
Aszinkron folyamatkövetés engedélyezése
Az aszinkron folyamatkövetés engedélyezéséhez állítsa a asyncProgressTrackingEnabled következőre true:
Python
stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
)
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
)
Scala
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Az átviteli sebesség javítása ellenőrzőpont gyakoriságával
Az alapértelmezett 1000 milliszekundumos ellenőrzőpont-gyakoriság jó átviteli sebességet biztosít a legtöbb lekérdezéshez. Ha az eltoláskezelési műveletek gyorsabban történnek, mint az aszinkron folyamatkövetés feldolgozhatja őket, az eltoláskezelési műveletek hátraléka jön létre. A hátralék további növekedésének megakadályozása érdekében az aszinkron folyamatkövetés blokkolhatja vagy lelassíthatja az adatfeldolgozást, ami ronthatja a várt késési előnyöket.
Ebben a forgatókönyvben a Databricks az ellenőrzőpont-időköz növelését javasolja:
Python
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
)
Scala
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
Jegyzet
A hibahelyreállítási idő nő az ellenőrzőpont időközével. Hiba esetén a folyamatnak újra kell feldolgoznia az összes adatot az előző sikeres ellenőrzőpont óta. Mielőtt elvégezené ezt a változást az éles környezetben, vegye figyelembe a normál feldolgozás során fellépő kisebb késés és a meghibásodás esetén a helyreállítási idő közötti kompromisszumot.
Az aszinkron folyamatkövetés kikapcsolása
Ha az aszinkron folyamatkövetés engedélyezve van, a stream nem garantálja az ellenőrzőpont előrehaladását minden köteg esetében. Mielőtt kikapcsolhatja ezt a funkciót, mentenie kell a haladást.
A kikapcsoláshoz kövesse az alábbi lépéseket:
Legalább két mikrotétel feldolgozása, ahol
asyncProgressTrackingEnabledatrueértékre,asyncProgressTrackingCheckpointIntervalMspedig a0értékre van állítva.Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start()Állítsa le a lekérdezést:
Python
query.stop()Scala
query.stop()Kapcsolja ki az aszinkron folyamatkövetést, és indítsa újra a lekérdezést:
Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start()
Ha a fenti lépések végrehajtása nélkül kapcsolja ki az aszinkron folyamatkövetést, a következő hibaüzenet jelenhet meg:
java.lang.IllegalStateException: batch x doesn't exist
Az illesztőprogram-naplókban a következő hiba jelenhet meg:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Limitations
- A Kafka-fogadók esetében az aszinkron folyamatkövetés csak az állapot nélküli csővezetékeket támogatja.
- Az aszinkron folyamatkövetés nem garantálja a pontos, végponttól végpontig tartó feldolgozást, mivel a köteg eltolási tartományai hiba esetén változhatnak. Egyes adatátviteli végpontok, például a Kafka, nem biztosítanak pontosan egyszeri garanciát.