Použití vodoznaků k řízení prahových hodnot zpracování dat
Tento článek představuje základní koncepty vodoznaků a poskytuje doporučení k používání vodoznaků v běžných stavových operacích streamování. Vodoznaky musíte použít u stavových operací streamování, abyste se vyhnuli nekonečnému rozšíření objemu dat uchovávaného ve stavu, což by mohlo vést k problémům s pamětí a zvýšit latenci zpracování během dlouhotrvajících operací streamování.
Co je vodoznak?
Strukturované streamování používá vodoznaky k řízení prahové hodnoty, jak dlouho se mají aktualizace pro danou entitu stavu dál zpracovávat. Mezi běžné příklady stavových entit patří:
- Agregace v časovém intervalu
- Jedinečné klíče v propojení mezi dvěma datovými proudy
Když deklarujete vodoznak, zadáte pole časového razítka a prahovou hodnotu vodoznaku v streamovacím datovém rámci. Při příchodu nových dat správce stavu sleduje poslední časové razítko v zadaném poli a zpracuje všechny záznamy v rámci prahové hodnoty zpoždění.
Následující příklad použije prahovou hodnotu meze 10 minut na počet oken:
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
V tomto příkladu:
- Sloupec
event_time
slouží k definování 10minutového vodoznaku a 5minutového přeskakujícího okna. - Pro každý
id
pozorovaný počet se shromažďuje počet pro každé nepřekrývající se 5minutové intervaly. - Informace o stavu se uchovávají pro každý počet, dokud konec okna není o 10 minut starší než nejnovější pozorované
event_time
.
Důležité
Prahové hodnoty meze zaručují, že se záznamy přicházející do zadané prahové hodnoty zpracovávají podle sémantiky definovaného dotazu. Zpožděné příchozí záznamy přicházející mimo zadanou prahovou hodnotu se stále můžou zpracovávat pomocí metrik dotazů, ale to není zaručené.
Jaký vliv mají vodoznaky na dobu zpracování a propustnost?
Vodoznaky komunikují s výstupními režimy a řídí, kdy se data zapisují do jímky. Vzhledem k tomu, že vodoznaky snižují celkové množství informací o stavu, které se mají zpracovat, je efektivní použití vodoznaků nezbytné pro efektivní propustnost stavového streamování.
Poznámka:
Pro všechny stavové operace se nepodporují všechny režimy výstupu.
Vodoznaky a výstupní režim pro agregace s okny
Následující tabulka podrobně popisuje zpracování dotazů s agregací u časového razítka s definovaným vodoznakem:
Výstupní režim | Chování |
---|---|
Připojit | Po překročení prahové hodnoty se řádky zapisují do cílové tabulky. Všechny zápisy jsou zpožděné na základě prahové hodnoty zpoždění. Starý stav agregace se po předání prahové hodnoty zahodí. |
Aktualizace | Řádky se zapisují do cílové tabulky při výpočtu výsledků a dají se aktualizovat a přepsat při příchodu nových dat. Starý stav agregace se po předání prahové hodnoty zahodí. |
Dokončit | Stav agregace se nezahodí. Cílová tabulka se přepíše s každou aktivační událostí. |
Vodoznaky a výstup pro spojení stream-stream
Spojení mezi více datovými proudy podporují pouze režim připojení a odpovídající záznamy se zapisují v každé dávce, kterou se zjistí. U vnitřních spojení doporučuje Databricks nastavit prahovou hodnotu meze u každého streamovaného zdroje dat. To umožňuje zahodit informace o stavu pro staré záznamy. Bez vodoznaků se strukturované streamování pokusí spojit každý klíč z obou stran spojení s každým triggerem.
Strukturované streamování má speciální sémantiku pro podporu vnějších spojení. Vodoznaky jsou povinné pro vnější spojení, protože označuje, kdy musí být klíč zapsán s hodnotou null po ukončení. Všimněte si, že zatímco vnější spojení (outer joins) mohou být užitečná pro zaznamenávání záznamů, které se během zpracování dat nikdy neshodují, protože spojení zapisují do tabulek pouze jako operace přidání, tato chybějící data se nezaznamenávají, dokud není překročen práh zpoždění.
Řízení prahové hodnoty pozdních dat pomocí více zásad vodoznaku ve strukturovaném streamování
Při práci s více vstupy strukturovaného streamování můžete nastavit více vodoznaků pro řízení prahových hodnot tolerance pro pozdní příchozí data. Konfigurace vodoznaků umožňuje řídit informace o stavu a ovlivnit latenci.
Streamovací dotaz může mít více vstupních datových proudů, které jsou sjednocovány nebo spojeny dohromady. Každý ze vstupních datových proudů může mít jinou prahovou hodnotu pozdních dat, která je potřeba tolerovat pro stavové operace. Zadejte tyto prahové hodnoty pro withWatermarks("eventTime", delay)
každý ze vstupních datových proudů. Následuje příklad dotazu s spojeními stream-stream.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Při spouštění dotazu strukturované streamování jednotlivě sleduje maximální dobu události zobrazenou v každém vstupním datovém proudu, vypočítá vodoznaky na základě odpovídajícího zpoždění a zvolí jeden globální vodoznak, který se má použít pro stavové operace. Ve výchozím nastavení je jako globální vodoznak zvoleno minimum, protože zajistí, že se žádná data omylem nezahodí jako příliš pozdě, pokud některý z datových proudů zaostává za ostatními (například kdyby jeden z datových proudů přestal přijímat data kvůli selháním zdrojového systému). Jinými slovy, globální vodoznak se bezpečně pohybuje rychlostí nejpomalejšího datového proudu a výstup dotazu se odpovídajícím způsobem zpozdí.
Pokud chcete získat rychlejší výsledky, můžete nastavit zásadu více vodoznaků tak, aby jako globální vodoznak zvolila maximální hodnotu tak, že nastavíte spark.sql.streaming.multipleWatermarkPolicy
konfigurace SQL na max
(výchozí hodnota je min
). Díky tomu se globální vodoznak pohybuje tempem nejrychlejšího datového proudu. Tato konfigurace však zahodí data z nejpomalejších datových proudů. Proto databricks doporučuje, abyste tuto konfiguraci používali uvážlivě.
Přetažení duplicit v rámci vodoznaku
V rámci Databricks Runtime 13.3 LTS a novějších můžete odstranit duplicity záznamů v rámci prahové hodnoty pomocí jedinečného identifikátoru.
Strukturované streamování poskytuje záruky zpracování přesně jednou, ale neodstraňuje automaticky záznamy ze zdrojů dat. Můžete použít dropDuplicatesWithinWatermark
k odstranění duplicitních dat u libovolného zadaného pole, což vám umožní odebrat duplicity ze streamu i v případě, že se některá pole liší (například čas události nebo čas příjezdu).
U duplicitních záznamů, které přicházejí do zadaného časového limitu, je garantováno, že budou zahozena. Tato záruka je striktní pouze v jednom směru a duplicitní záznamy, které přicházejí mimo zadanou prahovou hodnotu, mohou být také vyřazeny. Pokud chcete odebrat všechny duplicity, musíte nastavit prahovou hodnotu zpoždění vodoznaku delší než maximální rozdíly časových razítek mezi duplicitními událostmi.
Musíte zadat vodoznak, který použije metodu dropDuplicatesWithinWatermark
, jak je znázorněno v následujícím příkladu:
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])