Sdílet prostřednictvím


Výběr výstupního režimu pro strukturované streamování

Tento článek popisuje výběr výstupního režimu pro stavové streamování. Konfigurace výstupního režimu vyžadují pouze stavové datové proudy obsahující agregace.

Spojení podporují pouze režim výstupu připojení a výstupní režim nemá vliv na odstranění duplicitních dat. Libovolné stavové operátory mapGroupsWithState a flatMapGroupsWithState generují záznamy pomocí vlastní logiky, takže výstupní režim streamu nemá vliv na jejich chování.

U bezstavového streamování se všechny režimy výstupu chovají stejně.

Pokud chcete správně nakonfigurovat režim výstupu, musíte rozumět stavovým streamováním, vodoznakům a aktivačním událostem. Podívejte se na následující články:

Co je výstupní režim?

Výstupní režim dotazu strukturovaného streamování určuje, které záznamy operátory dotazu generují během každého triggeru. Existují tři typy záznamů, které je možné vygenerovat:

  • Záznamy, které se v budoucnu zpracovávají, se nezmění.
  • Záznamy, které se od posledního triggeru změnily.
  • Všechny záznamy ve stavové tabulce

Znalost typů záznamů, které se mají generovat, je důležitá pro stavové operátory, protože konkrétní řádek vytvořený stavovým operátorem se může změnit z triggeru na trigger. Když operátor streamované agregace přijímá více řádků pro určité okno, mohou se hodnoty agregace tohoto okna měnit při každém spuštění.

U bezstavových operátorů rozdíl mezi typy záznamů nemá vliv na chování operátoru. Záznamy, které bezstavový operátor generuje během triggeru, jsou vždy zdrojovými záznamy zpracovanými během tohoto triggeru.

Dostupné výstupní režimy

Existují tři výstupní režimy, které operátoru říkají, které záznamy se mají vygenerovat během konkrétního triggeru:

Výstupní režim Popis
Režim připojení (výchozí) Ve výchozím nastavení se streamované dotazy spouštějí v režimu připojení. V tomto režimu operátory generují pouze řádky, které se v budoucích aktivačních událostech nemění. Stavové operátory používají vodoznak (watermark) k určení, kdy k tomu dojde.
Režim aktualizace V režimu aktualizace generují operátory všechny řádky, které se během triggeru změnily, i když se v následném triggeru může vygenerovaný záznam změnit.
Režim dokončení Úplný režim funguje jenom s agregacemi streamování. V úplném režimu se všechny výsledné řádky vytvořené operátorem vygenerují v podřízené oblasti.

Důležité informace o produkčním prostředí

U mnoha operací stavového streamování musíte zvolit mezi režimem přidání a aktualizace. Následující části popisují aspekty, které by mohly vaše rozhodnutí informovat.

Poznámka:

Úplný režim má některé aplikace, ale může fungovat špatně při škálování dat. Databricks doporučuje používat materializovaná zobrazení k získání sémantických záruk spojených s úplným režimem s přírůstkovým zpracováním pro mnoho stavových operací. Viz Použití materializovaných zobrazení v Databricks SQL.

Sémantika aplikace

Sémantika aplikací popisuje, jak podřízené aplikace používají streamovaná data.

Pokud podřízené služby potřebují provést jednu akci pro každý podřízený zápis, použijte ve většině případů režim připojení. Pokud máte například podřízenou službu oznámení, která odesílá oznámení pro každý nový záznam zapsaný do jímky, režim připojení zajistí, že každý záznam bude zapsán pouze jednou. Režim aktualizace zapíše záznam pokaždé, když se změní informace o stavu, což by vedlo k mnoha aktualizacím.

Pokud podřízené služby potřebují nové výsledky, režim aktualizace zajistí, aby vaše jímka zůstala co up-to-date. Mezi příklady patří model strojového učení, který čte funkce v reálném čase nebo analytický řídicí panel sledující agregace v reálném čase.

Kompatibilita operátorů a jímky

Strukturované streamování nepodporuje všechny operace dostupné v Apache Sparku a některé operace streamování nejsou podporované ve všech režimech výstupu. Další informace o omezeních operátorů najdete v dokumentaci ke streamování OSS.

Ne všechny jímky podporují všechny režimy výstupu. Delta Lake, která zálohuje všechny spravované tabulky Katalogu Unity, i Kafka podporují všechny režimy výstupu. Další informace o kompatibilitě jímky najdete v dokumentaci ke streamování OSS.

Latence a náklady

Výstupní režim ovlivňuje, kolik času musí uplynou před zápisem záznamu a frekvence a množství zapsaných dat může mít vliv na náklady spojené s kanály streamování.

Režim připojení vynutí stavové operátory generovat výsledky až po dokončení stavových výsledků, což je alespoň tak dlouho, dokud se vodoznak zpozdí. Zpoždění vodoznaku 1 hour v režimu připojování výstupu znamená, že vaše záznamy mají alespoň 1 hodinu zpoždění, než jsou vysílány dále po proudu.

Režim aktualizace vede k jednomu zápisu pro každý trigger pro každou agregovanou hodnotu. Pokud se za každý zápis na každý záznam v jímce účtuje, může to být nákladné, pokud se záznamy aktualizují mnohokrát, než uplyne zpoždění vodoznaku.

Příklady konfigurace

Následující příklady kódu ukazují, jak nakonfigurovat výstupní režim pro streamované aktualizace do tabulek katalogu Unity:

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

Viz dokumentace OSS pro PySpark DataStreamWriter.outputMode nebo Scala DataStreamWriter.outputMode.

Příklad stavových režimů streamování a výstupu

Následující příklad vám pomůže zjistit, jak výstupní režim komunikuje s vodoznaky pro stavové streamování.

Zvažte streamovací agregaci, která počítá celkové tržby generované každou hodinu v obchodě s 15minutovým zpožděním watermark. První mikrobatch zpracovává následující záznamy:

  • $15 v 2:40pm
  • $10 v 2:30pm
  • $30 v 3:10pm

V tomto okamžiku je vodoznak motoru 14:55, protože odečte 15 minut (zpoždění) od maximálního času (15:10). Operátor agregace streamování má ve svém stavu následující:

  • [2pm, 3pm]: 25 Kč
  • [3pm, 4pm]: 30 Kč

Následující tabulka popisuje, co by se stalo v každém výstupním režimu:

Výstupní režim Výsledek a důvod
Připojit Operátor agregace streamování negeneruje nic podřízeného. Důvodem je, že obě tato okna se můžou změnit, protože se nové hodnoty zobrazí s následnou aktivační událostí: vodoznak 2:55 udává, že záznamy po 2:55 můžou být stále doručeny a tyto záznamy můžou spadat do okna [2pm, 3pm] nebo do okna [3pm, 4pm].
Aktualizace Operátor generuje oba záznamy, protože oba záznamy obdržely aktualizace.
Dokončit Operátor generuje všechny záznamy.

Předpokládejme, že stream přijímá ještě jeden záznam:

  • $20 v 3:20pm

Vodoznak se aktualizuje na 3:05, protože motor odečte 15 minut od 3:20. V tomto okamžiku má operátor agregace streamování ve svém stavu následující:

  • [2pm, 3pm]: 25 Kč
  • [3pm, 4pm]: 50 Kč

Následující tabulka popisuje, co by se stalo v každém výstupním režimu:

Výstupní režim Výsledek a důvod
Připojit Operátor agregace streamování zaznamenává, že časová značka 15:05 je větší než konec okna [2pm, 3pm]. Podle definice vodoznaku se už toto okno nemůže změnit, takže vygeneruje okno [2pm, 3pm].
Aktualizace Operátor agregace streamování vygeneruje okno [3pm, 4pm], protože hodnota stavu se změnila z $30 na $50.
Dokončit Operátor generuje všechny záznamy.

Následující souhrn shrnuje, jak se stavové operátory chovají v každém režimu připojení:

  • V režimu připojování zapište záznamy pouze jednou po zpoždění způsobeném vodoznakem.
  • V režimu aktualizace zapisujte záznamy, které se od předchozího triggeru změnily.
  • V úplném režimu zapisujte všechny záznamy vytvořené stavovým operátorem.