Asynchronní sledování průběhu

Asynchronní sledování průběhu snižuje latenci kanálů strukturovaného streamování tím, že umožňuje dotazům asynchronně aktualizovat průběh kontrolního bodu a zpracovávat data v každé mikrodávce.

Během zpracování dotazů strukturované streamování persistuje a spravuje posuny, které měří pokrok dotazu v offsetLog a commitLog v každé mikrodávce. Bez asynchronního sledování průběhu operace řízení offsetu přímo ovlivňují latenci zpracování, protože zpracování dat nemůže pokračovat, dokud nejsou dokončeny.

Asynchronní sledování průběhu

Poznámka

Asynchronní sledování průběhu není kompatibilní se spouštěči Trigger.once nebo Trigger.availableNow. Pokud je tato možnost povolená, strukturované dotazy streamování s Trigger.once nebo Trigger.availableNow selhávají.

Možnosti konfigurace

Možnost Výchozí Popis
asyncProgressTrackingEnabled false Zda chcete povolit asynchronní sledování průběhu.
asyncProgressTrackingCheckpointIntervalMs 1000 Interval v milisekundách mezi zápisy pro přesuny a potvrzení dokončení.

Povolení asynchronního sledování průběhu

Pokud chcete povolit asynchronní sledování průběhu, nastavte asyncProgressTrackingEnabled na true:

Python

stream = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()
)

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()
)

Scala

val stream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "in")
    .load()

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .start()

Zlepšení propustnosti s využitím frekvence kontrolních bodů

Výchozí frekvence kontrolních 1000 bodů milisekund má pro většinu dotazů dobrou propustnost. Když k operacím správy posunu dochází rychleji, než je asynchronní sledování průběhu schopno zpracovat, vzniká zaostávání operací správy posunu. Aby se backlog dál nezvětšoval, asynchronní sledování průběhu může blokovat nebo zpomalit zpracování dat, což může potenciálně narušit očekávané výhody latence.

V tomto scénáři doporučuje Databricks zvýšit interval kontrolního bodu:

Python

query = (stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()
)

Scala

val query = stream.writeStream
    .format("kafka")
    .option("topic", "out")
    .option("checkpointLocation", "/tmp/checkpoint")
    .option("asyncProgressTrackingEnabled", "true")
    .option("asyncProgressTrackingCheckpointIntervalMs", "5000")
    .start()

Poznámka

Doba obnovení po selhání se zvyšuje s časem intervalu kontrolních bodů. V případě selhání musí kanál znovu zpracovat všechna data od předchozího úspěšného kontrolního bodu. Než provedete tuto změnu v produkčním prostředí, zvažte kompromis mezi nižší latencí během pravidelného zpracování v porovnání s časem obnovení v případě selhání.

Vypnutí asynchronního sledování průběhu

Pokud je aktivováno asynchronní sledování pokroku, stream nezaručuje pokrok v kontrolním bodě pro každou dávku. Než budete moct tuto funkci vypnout, musíte uložit průběh do kontrolního bodu.

Pokud chcete vypnout, postupujte takto:

  1. Zpracujte alespoň dvě mikrodávky s asyncProgressTrackingEnabled nastaveným na true a asyncProgressTrackingCheckpointIntervalMs nastaveným na 0.

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
        .option("asyncProgressTrackingCheckpointIntervalMs", "0")
        .start()
    
  2. Zastavte dotaz:

    Python

    query.stop()
    

    Scala

    query.stop()
    
  3. Vypněte asynchronní sledování průběhu a restartujte dotaz:

    Python

    query = (stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    )
    

    Scala

    val query = stream.writeStream
        .format("kafka")
        .option("topic", "out")
        .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "false")
        .start()
    

Pokud vypnete asynchronní sledování průběhu bez použití výše uvedených kroků, může dojít k následující chybě:

java.lang.IllegalStateException: batch x doesn't exist

V protokolech ovladačů se může zobrazit následující chyba:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Omezení

  • U jímek Kafka podporuje asynchronní sledování průběhu pouze bezstavové potrubí.
  • Asynchronní sledování průběhu nezaručuje zpracování pouze jednou od začátku do konce, protože rozsahy offsetů pro dávku se můžou při selhání změnit. Některé jímky, například Kafka, nikdy neposkytují záruky přesně jednou.