Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Śledzenie postępu asynchronicznego umożliwia potokom przetwarzania strumieniowego w oparciu o struktury zapisy postępu asynchronicznie i równolegle z rzeczywistym przetwarzaniem danych w ramach mikropartii, co zmniejsza opóźnienia związane z utrzymywaniem offsetLog i commitLog.
Notatka
Śledzenie postępu asynchronicznego nie działa z wyzwalaczami Trigger.once ani Trigger.availableNow. Próba włączenia tej funkcji za pomocą tych wyzwalaczy powoduje niepowodzenie zapytania.
Jak działa śledzenie postępu asynchronicznego w celu zmniejszenia opóźnienia?
Strukturalne przesyłanie strumieniowe opiera się na utrwalaniu przesunięć i zarządzaniu nimi jako wskaźników postępu przetwarzania zapytań. Operacja zarządzania przesunięciem ma bezpośredni wpływ na opóźnienie przetwarzania, ponieważ przetwarzanie danych nie może nastąpić do momentu ukończenia tych operacji. Asynchroniczne śledzenie postępu umożliwia zorganizowanym potokom przesyłania strumieniowego tworzenie punktów kontrolnych bez wpływu na operacje zarządzania przesunięciami.
Kiedy należy skonfigurować częstotliwość punktów kontrolnych?
Użytkownicy mogą skonfigurować częstotliwość zapisywania postępu. Domyślne ustawienia częstotliwości punktów kontrolnych zapewniają dobrą przepływność dla większości zapytań. Skonfigurowanie częstotliwości jest przydatne w scenariuszach, w których operacje zarządzania przesunięciem występują z większą szybkością niż można je przetworzyć, co powoduje coraz większe zaległości operacji zarządzania przesunięciem. Aby powstrzymać te rosnące zaległości, przetwarzanie danych jest blokowane lub spowalniane, co zasadniczo przywraca mniej efektywne zachowanie przetwarzania, eliminując korzyści z asynchronicznego śledzenia postępu.
Notatka
Czas odzyskiwania po awarii zwiększa się wraz ze wzrostem interwału punktów kontrolnych. W przypadku awarii potok musi ponownie przetworzyć wszystkie dane przed ostatnim udanym punktem kontrolnym. Użytkownicy mogą rozważyć ten kompromis między mniejszym opóźnieniem podczas regularnego przetwarzania i czasu odzyskiwania w przypadku awarii.
Jakie konfiguracje są skojarzone ze śledzeniem postępu asynchronicznego?
| Opcja | Wartość | Domyślny | Opis |
|---|---|---|---|
| włączone śledzenie postępów asynchronicznych | prawda/fałsz | fałszywy | włączanie lub wyłączanie śledzenia postępu asynchronicznego |
| asyncProgressTrackingCheckpointIntervalMs (asynchroniczny interwał czasu kontrolnego śledzenia postępu w ms) | Milisekund | 1000 | przedział czasowy, w którym realizujemy przesunięcia i zatwierdzenia |
Jak użytkownicy mogą włączyć śledzenie postępu asynchronicznego?
Użytkownicy mogą używać kodu podobnego do poniższego kodu, aby włączyć tę funkcję:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Wyłączanie śledzenia postępu asynchronicznego
Po włączeniu śledzenia postępu asynchronicznego framework nie zapisuje postępu dla każdej partii. Aby rozwiązać ten problem, zanim wyłączysz śledzenie postępu asynchronicznego, należy przetworzyć co najmniej dwie mikrosady z następującymi ustawieniami:
.option("asyncProgressTrackingEnabled", "true").option("asyncProgressTrackingCheckpointIntervalMs", 0)
Zatrzymaj zapytanie po zakończeniu przetwarzania co najmniej dwóch mikropartii. Teraz możesz bezpiecznie wyłączyć śledzenie postępu asynchronicznego i ponownie uruchomić zapytanie.
Jeśli śledzenie postępu asynchronicznego zostało wyłączone bez wykonywania tego kroku, może wystąpić następujący błąd:
java.lang.IllegalStateException: batch x doesn't exist
W dziennikach sterowników może zostać wyświetlony następujący błąd:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Postępując zgodnie z instrukcjami w tej sekcji, aby wyłączyć śledzenie postępu asynchronicznego, można rozwiązać te błędy i naprawić obciążenie przesyłania strumieniowego.
Ograniczenia dotyczące śledzenia postępu asynchronicznego
Ta funkcja ma następujące ograniczenia:
- Śledzenie postępu asynchronicznego jest obsługiwane tylko w potokach bezstanowych podczas korzystania z platformy Kafka jako ujścia.
- Dokładnie raz przetwarzanie od początku do końca nie jest gwarantowane z asynchronicznym śledzeniem postępu, ponieważ zakresy przesunięć dla partii mogą zostać zmienione w przypadku awarii. Niektóre ujścia, takie jak Kafka, nigdy nie zapewniają gwarancji typu exactly-once.