Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Asynchronní sledování průběhu snižuje latenci kanálů strukturovaného streamování tím, že umožňuje dotazům asynchronně aktualizovat průběh kontrolního bodu a zpracovávat data v každé mikrodávce.
Během zpracování dotazů strukturované streamování persistuje a spravuje posuny, které měří pokrok dotazu v offsetLog a commitLog v každé mikrodávce. Bez asynchronního sledování průběhu operace řízení offsetu přímo ovlivňují latenci zpracování, protože zpracování dat nemůže pokračovat, dokud nejsou dokončeny.
Poznámka
Asynchronní sledování průběhu není kompatibilní se spouštěči Trigger.once nebo Trigger.availableNow. Pokud je tato možnost povolená, strukturované dotazy streamování s Trigger.once nebo Trigger.availableNow selhávají.
Možnosti konfigurace
| Možnost | Výchozí | Popis |
|---|---|---|
asyncProgressTrackingEnabled |
false |
Zda chcete povolit asynchronní sledování průběhu. |
asyncProgressTrackingCheckpointIntervalMs |
1000 |
Interval v milisekundách mezi zápisy pro přesuny a potvrzení dokončení. |
Povolení asynchronního sledování průběhu
Pokud chcete povolit asynchronní sledování průběhu, nastavte asyncProgressTrackingEnabled na true:
Python
stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
)
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
)
Scala
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Zlepšení propustnosti s využitím frekvence kontrolních bodů
Výchozí frekvence kontrolních 1000 bodů milisekund má pro většinu dotazů dobrou propustnost. Když k operacím správy posunu dochází rychleji, než je asynchronní sledování průběhu schopno zpracovat, vzniká zaostávání operací správy posunu. Aby se backlog dál nezvětšoval, asynchronní sledování průběhu může blokovat nebo zpomalit zpracování dat, což může potenciálně narušit očekávané výhody latence.
V tomto scénáři doporučuje Databricks zvýšit interval kontrolního bodu:
Python
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
)
Scala
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
Poznámka
Doba obnovení po selhání se zvyšuje s časem intervalu kontrolních bodů. V případě selhání musí kanál znovu zpracovat všechna data od předchozího úspěšného kontrolního bodu. Než provedete tuto změnu v produkčním prostředí, zvažte kompromis mezi nižší latencí během pravidelného zpracování v porovnání s časem obnovení v případě selhání.
Vypnutí asynchronního sledování průběhu
Pokud je aktivováno asynchronní sledování pokroku, stream nezaručuje pokrok v kontrolním bodě pro každou dávku. Než budete moct tuto funkci vypnout, musíte uložit průběh do kontrolního bodu.
Pokud chcete vypnout, postupujte takto:
Zpracujte alespoň dvě mikrodávky s
asyncProgressTrackingEnablednastaveným natrueaasyncProgressTrackingCheckpointIntervalMsnastaveným na0.Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start()Zastavte dotaz:
Python
query.stop()Scala
query.stop()Vypněte asynchronní sledování průběhu a restartujte dotaz:
Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start()
Pokud vypnete asynchronní sledování průběhu bez použití výše uvedených kroků, může dojít k následující chybě:
java.lang.IllegalStateException: batch x doesn't exist
V protokolech ovladačů se může zobrazit následující chyba:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Omezení
- U jímek Kafka podporuje asynchronní sledování průběhu pouze bezstavové potrubí.
- Asynchronní sledování průběhu nezaručuje zpracování pouze jednou od začátku do konce, protože rozsahy offsetů pro dávku se můžou při selhání změnit. Některé jímky, například Kafka, nikdy neposkytují záruky přesně jednou.