Co je asynchronní sledování průběhu?
Důležitý
Tato funkce je ve verzi Public Preview.
Asynchronní sledování průběhu umožňuje tokům strukturovaného streamování asynchronně a paralelně sledovat pokrok se skutečným zpracováním dat v rámci mikrodávky, což snižuje latenci spojenou s údržbou offsetLog
a commitLog
.
Poznámka
Asynchronní sledování průběhu nefunguje s aktivačními událostmi Trigger.once
ani Trigger.availableNow
. Při pokusu o povolení této funkce s těmito triggery dochází k selhání dotazu.
Jak asynchronní sledování průběhu funguje, aby se snížila latence?
Strukturované streamování závisí na zachování a správě posunů jako indikátorů průběhu zpracování dotazů. Operace správy posunu má přímý vliv na latenci zpracování, protože dokud se tyto operace nedokončí, nemůže dojít ke zpracování dat. Asynchronní sledování průběhu umožňuje pipeline pro strukturované streamování provádět kontrolní bod postupu, aniž by byly ovlivněny těmito operacemi řízení offsetu.
Kdy byste měli nakonfigurovat frekvenci kontrolních bodů?
Uživatelé můžou nakonfigurovat frekvenci, při které je průběh kontrolních bodů. Výchozí nastavení frekvence kontrolních bodů poskytují dobrou propustnost pro většinu dotazů. Konfigurace četnosti je užitečná pro scénáře, ve kterých dochází k operacím správy posunu s vyšší rychlostí, než je možné zpracovat, což vytváří stále rostoucí backlog operací správy posunu. Aby se tento rostoucí backlog zastavil, zpracování dat se zablokuje nebo zpomalí, což v podstatě vrací chování zpracování a eliminuje výhody asynchronního sledování průběhu.
Poznámka
Čas obnovy po selhání se zvyšuje s nárůstem intervalu kontrolního bodu. V případě selhání musí kanál znovu zpracovat všechna data před předchozím úspěšným kontrolním bodem. Uživatelé můžou tento kompromis zvážit mezi nižší latencí během pravidelného zpracování a doby obnovení v případě selhání.
Jaké konfigurace se vážou k asynchronnímu sledování postupu?
Možnost | Hodnota | Výchozí | Popis |
---|---|---|---|
sledováníProgresuAsynchronníZapnuto | pravda/nepravda | falešný | povolení nebo zakázání asynchronního sledování průběhu |
asyncProgressTrackingCheckpointIntervalMs (interval kontrolního bodu asynchronního sledování pokroku v milisekundách) | milisekundy | 1000 | interval, ve kterém provádíme potvrzení posunů a závěrečná potvrzení dokončení |
Jak můžou uživatelé povolit asynchronní sledování průběhu?
Uživatelé můžou k povolení této funkce použít kód podobný následujícímu kódu:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
Vypnutí asynchronního sledování průběhu
Pokud je povolené sledování asynchronního průběhu, framework neprovádí kontrolní bod průběhu pro každou dávku. Pokud chcete tento postup vyřešit, před zakázáním asynchronního sledování průběhu zpracujte alespoň dvě mikrodávkové dávky s následujícím nastavením:
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", 0)
Zastavte dotaz po dokončení zpracování alespoň dvou mikrodávek. Teď můžete bezpečně zakázat sledování asynchronního průběhu a restartovat dotaz.
Pokud jste zakázali asynchronní sledování průběhu bez dokončení tohoto kroku, může dojít k následující chybě:
java.lang.IllegalStateException: batch x doesn't exist
V protokolech ovladačů se může zobrazit následující chyba:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Podle pokynů v této části zakažte asynchronní sledování průběhu, které vám umožní vyřešit tyto chyby a opravit úlohy streamování.
Omezení s asynchronním sledováním průběhu
Tato funkce má následující omezení:
- Asynchronní sledování průběhu je možné pouze v bezstavových pipelinech při použití Kafka jako jímku.
- Zpracování přesně jednou od začátku do konce není zaručeno při asynchronním sledování průběhu, protože rozsahy posunů pro dávku lze změnit v případě selhání. Některé jímky, například Kafka, nikdy neposkytují záruky přesně jednou.