Co to jest asynchroniczne śledzenie postępu?
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Śledzenie postępu asynchronicznego umożliwia potokom przesyłania strumieniowego ze strukturą asynchronicznie i równolegle do rzeczywistego przetwarzania danych w ramach mikrosadowej, co zmniejsza opóźnienie związane z konserwowaniem offsetLog
obiektów i commitLog
.
Uwaga
Śledzenie postępu asynchronicznego nie działa z wyzwalaczami Trigger.once
ani Trigger.availableNow
z nimi. Próba włączenia tej funkcji za pomocą tych wyzwalaczy powoduje niepowodzenie zapytania.
Jak działa asynchroniczne śledzenie postępu w celu zmniejszenia opóźnienia?
Przesyłanie strumieniowe ze strukturą polega na utrwalaniu przesunięć i zarządzaniu nimi jako wskaźników postępu przetwarzania zapytań. Operacja zarządzania przesunięciem ma bezpośredni wpływ na opóźnienie przetwarzania, ponieważ żadne przetwarzanie danych nie może nastąpić do momentu ukończenia tych operacji. Śledzenie postępu asynchronicznego umożliwia potokom przesyłania strumieniowego ze strukturą postęp punktów kontrolnych bez wpływu na te operacje zarządzania przesunięciem.
Kiedy należy skonfigurować częstotliwość punktów kontrolnych?
Użytkownicy mogą skonfigurować częstotliwość, z jaką jest punkt kontrolny postępu. Domyślne ustawienia częstotliwości punktów kontrolnych zapewniają dobrą przepływność dla większości zapytań. Skonfigurowanie częstotliwości jest przydatne w scenariuszach, w których operacje zarządzania przesunięciem występują z wyższą szybkością niż można je przetworzyć, co powoduje coraz większe zaległości operacji zarządzania przesunięciem. Aby powstrzymać tę rosnącą listę prac, przetwarzanie danych jest blokowane lub spowalniane, zasadniczo przywraca zachowanie przetwarzania, aby wyeliminować zalety śledzenia postępu asynchronicznego.
Uwaga
Czas odzyskiwania po awarii zwiększa się wraz ze wzrostem czasu interwału punktów kontrolnych. W przypadku awarii potok musi ponownie przetworzyć wszystkie dane przed poprzednim pomyślnym punktem kontrolnym. Użytkownicy mogą rozważyć ten kompromis między mniejszym opóźnieniem podczas regularnego przetwarzania i czasu odzyskiwania w przypadku awarii.
Jakie konfiguracje są skojarzone ze śledzeniem postępu asynchronicznego?
Opcja | Wartość | Domyślny | Opis |
---|---|---|---|
asyncProgressTrackingEnabled | prawda/fałsz | False | włączanie lub wyłączanie asynchronicznego śledzenia postępu |
asyncProgressTrackingCheckpointIntervalMs | Milisekund | 1000 | interwał zatwierdzania przesunięć i zatwierdzeń ukończenia |
Jak użytkownicy mogą włączyć śledzenie postępu asynchronicznego?
Użytkownicy mogą używać kodu podobnego do poniższego kodu, aby włączyć tę funkcję:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Wyłączanie asynchronicznego śledzenia postępu
Po włączeniu śledzenia postępu asynchronicznego platforma nie wykonuje postępu punktu kontrolnego dla każdej partii. Aby rozwiązać ten problem, przed wyłączeniem śledzenia postępu asynchronicznego należy przetworzyć co najmniej dwie mikrosady z następującymi ustawieniami:
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", 0)
Zatrzymaj zapytanie po zakończeniu przetwarzania co najmniej dwóch mikrosadów. Teraz możesz bezpiecznie wyłączyć śledzenie postępu asynchronicznego i ponownie uruchomić zapytanie.
Jeśli śledzenie postępu asynchronicznego zostało wyłączone bez wykonywania tego kroku, może wystąpić następujący błąd:
java.lang.IllegalStateException: batch x doesn't exist
W dziennikach sterowników może zostać wyświetlony następujący błąd:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Postępując zgodnie z instrukcjami w tej sekcji, aby wyłączyć śledzenie postępu asynchronicznego, można rozwiązać te błędy i naprawić obciążenie przesyłania strumieniowego.
Ograniczenia dotyczące śledzenia postępu asynchronicznego
Ta funkcja ma następujące ograniczenia:
- Śledzenie postępu asynchronicznego jest obsługiwane tylko w potokach bezstanowych w przypadku korzystania z platformy Kafka jako ujścia.
- Dokładnie raz kompleksowe przetwarzanie nie jest gwarantowane za pomocą asynchronicznego śledzenia postępu, ponieważ zakresy przesunięcia dla partii mogą zostać zmienione w przypadku awarii. Niektóre ujścia, takie jak kafka, nigdy nie zapewniają dokładnie raz gwarancji.