Sdílet prostřednictvím


Co je asynchronní sledování průběhu?

Důležitý

Tato funkce je ve verzi Public Preview.

Asynchronní sledování průběhu umožňuje tokům strukturovaného streamování asynchronně a paralelně sledovat pokrok se skutečným zpracováním dat v rámci mikrodávky, což snižuje latenci spojenou s údržbou offsetLog a commitLog.

asynchronního sledování průběhu

Poznámka

Asynchronní sledování průběhu nefunguje s aktivačními událostmi Trigger.once ani Trigger.availableNow. Při pokusu o povolení této funkce s těmito triggery dochází k selhání dotazu.

Jak asynchronní sledování průběhu funguje, aby se snížila latence?

Strukturované streamování závisí na zachování a správě posunů jako indikátorů průběhu zpracování dotazů. Operace správy posunu má přímý vliv na latenci zpracování, protože dokud se tyto operace nedokončí, nemůže dojít ke zpracování dat. Asynchronní sledování průběhu umožňuje pipeline pro strukturované streamování provádět kontrolní bod postupu, aniž by byly ovlivněny těmito operacemi řízení offsetu.

Kdy byste měli nakonfigurovat frekvenci kontrolních bodů?

Uživatelé můžou nakonfigurovat frekvenci, při které je průběh kontrolních bodů. Výchozí nastavení frekvence kontrolních bodů poskytují dobrou propustnost pro většinu dotazů. Konfigurace četnosti je užitečná pro scénáře, ve kterých dochází k operacím správy posunu s vyšší rychlostí, než je možné zpracovat, což vytváří stále rostoucí backlog operací správy posunu. Aby se tento rostoucí backlog zastavil, zpracování dat se zablokuje nebo zpomalí, což v podstatě vrací chování zpracování a eliminuje výhody asynchronního sledování průběhu.

Poznámka

Čas obnovy po selhání se zvyšuje s nárůstem intervalu kontrolního bodu. V případě selhání musí kanál znovu zpracovat všechna data před předchozím úspěšným kontrolním bodem. Uživatelé můžou tento kompromis zvážit mezi nižší latencí během pravidelného zpracování a doby obnovení v případě selhání.

Jaké konfigurace se vážou k asynchronnímu sledování postupu?

Možnost Hodnota Výchozí Popis
sledováníProgresuAsynchronníZapnuto pravda/nepravda falešný povolení nebo zakázání asynchronního sledování průběhu
asyncProgressTrackingCheckpointIntervalMs (interval kontrolního bodu asynchronního sledování pokroku v milisekundách) milisekundy 1000 interval, ve kterém provádíme potvrzení posunů a závěrečná potvrzení dokončení

Jak můžou uživatelé povolit asynchronní sledování průběhu?

Uživatelé můžou k povolení této funkce použít kód podobný následujícímu kódu:

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()

val query = stream.writeStream
     .format("kafka")
        .option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
        .option("asyncProgressTrackingEnabled", "true")
     .start()

Vypnutí asynchronního sledování průběhu

Pokud je povolené sledování asynchronního průběhu, framework neprovádí kontrolní bod průběhu pro každou dávku. Pokud chcete tento postup vyřešit, před zakázáním asynchronního sledování průběhu zpracujte alespoň dvě mikrodávkové dávky s následujícím nastavením:

  • .option("asyncProgressTrackingEnabled", "true")
  • .option("asyncProgressTrackingCheckpointIntervalMs", 0)

Zastavte dotaz po dokončení zpracování alespoň dvou mikrodávek. Teď můžete bezpečně zakázat sledování asynchronního průběhu a restartovat dotaz.

Pokud jste zakázali asynchronní sledování průběhu bez dokončení tohoto kroku, může dojít k následující chybě:

java.lang.IllegalStateException: batch x doesn't exist

V protokolech ovladačů se může zobrazit následující chyba:

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

Podle pokynů v této části zakažte asynchronní sledování průběhu, které vám umožní vyřešit tyto chyby a opravit úlohy streamování.

Omezení s asynchronním sledováním průběhu

Tato funkce má následující omezení:

  • Asynchronní sledování průběhu je možné pouze v bezstavových pipelinech při použití Kafka jako jímku.
  • Zpracování přesně jednou od začátku do konce není zaručeno při asynchronním sledování průběhu, protože rozsahy posunů pro dávku lze změnit v případě selhání. Některé jímky, například Kafka, nikdy neposkytují záruky přesně jednou.