Optimalizace stavového zpracování v rozdílových živých tabulkách pomocí vodoznaků
Pokud chcete efektivně spravovat data uložená ve stavu, použijte vodoznaky při provádění zpracování stavových datových proudů v rozdílových živých tabulkách, včetně agregací, spojení a odstranění duplicitních dat. Tento článek popisuje, jak používat vodoznaky v dotazech Delta Live Tables a obsahuje příklady doporučených operací.
Poznámka:
Aby se zajistilo, že dotazy, které provádějí agregace, se zpracovávají přírůstkově a nejsou plně přepočítané při každé aktualizaci, musíte použít vodoznaky.
Co je vodoznak?
Při zpracování datových proudů je vodoznak funkcí Apache Sparku, která může definovat prahovou hodnotu založenou na čase pro zpracování dat při provádění stavových operací, jako jsou agregace. Příchozí data se zpracovávají, dokud nedosáhnete prahové hodnoty. V tomto okamžiku je časové období definované prahovou hodnotou uzavřeno. Vodoznaky se dají použít k zabránění problémům při zpracování dotazů, zejména při zpracování větších datových sad nebo dlouhotrvajícího zpracování. Tyto problémy můžou zahrnovat vysokou latenci při vytváření výsledků a dokonce i chyby typu nedostatek paměti (OOM) kvůli množství dat, která se během zpracování uchovávají ve stavu. Vzhledem k tomu, že streamovaná data jsou ze své podstaty neuspořádaná, vodoznaky také podporují správné výpočty operací, jako jsou agregace časových intervalů.
Další informace o používání vodoznaků při zpracování datových proudů najdete v tématu Vodoznaky ve strukturovaném streamování Apache Sparku a použití vodoznaků pro řízení prahových hodnot zpracování dat.
Jak definujete vodoznak?
Vodoznak definujete tak, že zadáte pole časového razítka a hodnotu představující prahovou hodnotu času pro pozdní doručení dat . Data se považují za opožděná, pokud dorazí po definované prahové hodnotě času. Pokud je například prahová hodnota definována jako 10 minut, můžou se záznamy přicházející po 10minutové prahové hodnotě vynechat.
Vzhledem k tomu, že záznamy, které přicházejí po definované prahové hodnotě, mohou být vyřazeny, je důležité vybrat prahovou hodnotu, která splňuje vaše latence a požadavky na správnost. Pokud zvolíte menší prahovou hodnotu, vygenerují se záznamy dříve, ale také to znamená, že opožděné záznamy budou pravděpodobně vynechány. Větší prahová hodnota znamená delší čekání, ale možná větší úplnost dat. Kvůli větší velikosti stavu může větší prahová hodnota vyžadovat také další výpočetní prostředky. Vzhledem k tomu, že prahová hodnota závisí na požadavcích na data a zpracování, je důležité určit optimální prahovou hodnotu testováním a monitorováním zpracování.
Pomocí funkce v Pythonu withWatermark()
definujete vodoznak. V SQL použijte WATERMARK
klauzuli k definování vodoznaku:
Python
withWatermark("timestamp", "3 minutes")
SQL
WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES
Použití vodoznaků se spojeními stream-stream
U spojení stream-stream musíte definovat vodoznak na obou stranách spojení a klauzuli časového intervalu. Vzhledem k tomu, že každý zdroj spojení obsahuje neúplné zobrazení dat, je nutné, aby klauzule časového intervalu řekla modulu streamování, když není možné provést žádné další shody. Klauzule časového intervalu musí používat stejná pole, která slouží k definování vodoznaků.
Vzhledem k tomu, že každý datový proud vyžaduje pro vodoznaky různé prahové hodnoty, nemusí mít datové proudy stejné prahové hodnoty. Aby se zabránilo chybějícím datům, modul streamování udržuje jeden globální vodoznak založený na nejpomalejším datovém proudu.
Následující příklad spojí stream reklamních impresí a stream uživatelů klikne na reklamy. V tomto příkladu se kliknutí musí objevit do 3 minut od zobrazení. Po uplynutí 3minutového časového intervalu se řádky ze stavu, který už nelze shodovat, zahodí.
Python
import dlt
dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
clicksDf = (read_stream("rawClicks")
.withWatermark("clickTimestamp", "3 minutes")
)
impressionsDf = (read_stream("rawAdImpressions")
.withWatermark("impressionTimestamp", "3 minutes")
)
joinDf = impressionsDf.alias("imp").join(
clicksDf.alias("click"),
expr("""
imp.userId = click.userId AND
clickAdId = impressionAdId AND
clickTimestamp >= impressionTimestamp AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
"""),
"inner"
).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")
return joinDf
SQL
CREATE OR REFRESH STREAMING TABLE
silver.adImpressionClicks
AS SELECT
imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
(LIVE.bronze.rawAdImpressions)
WATERMARK
impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
(LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
imp.userId = click.userId
AND
clickAdId = impressionAdId
AND
clickTimestamp >= impressionTimestamp
AND
clickTimestamp <= impressionTimestamp + interval 3 minutes
Provádění agregací s okny s vodoznaky
Běžnou stavovou operací streamovaných dat je agregace s okny. Agregace s okny jsou podobné seskupeným agregacím s tím rozdílem, že agregační hodnoty se vrátí pro sadu řádků, které jsou součástí definovaného okna.
Okno lze definovat jako určitou délku a operaci agregace lze provést na všech řádcích, které jsou součástí tohoto okna. Streamování Sparku podporuje tři typy oken:
- Přeskakující (pevná) okna: Řada nepřekrývajících se a souvislých časových intervalů s pevnou velikostí. Vstupní záznam patří pouze do jednoho okna.
- Posuvná okna: Podobně jako přeskakující okna jsou posuvná okna pevná, ale okna se můžou překrývat a záznam může spadat do více oken.
Když data dorazí na konec okna a délku vodoznaku, nebudou pro okno přijata žádná nová data, vygeneruje se výsledek agregace a stav okna se zahodí.
Následující příklad vypočítá součet impresí každých 5 minut pomocí pevného okna. V tomto příkladu klauzule select používá alias impressions_window
a pak samotné okno je definováno jako součást GROUP BY
klauzule. Okno musí být založené na stejném sloupci časového razítka jako vodoznak, což je sloupec v tomto příkladu clickTimestamp
.
CREATE OR REFRESH STREAMING TABLE
gold.adImpressionSeconds
AS SELECT
impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
(LIVE.silver.adImpressionClicks)
WATERMARK
clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
impressionAdId, window(clickTimestamp, "5 minutes")
Podobný příklad v Pythonu pro výpočet zisku v hodinových pevných oknech:
import dlt
@dlt.table()
def profit_by_hour():
return (
spark.readStream.table("sales")
.withWatermark("timestamp", "1 hour")
.groupBy(window("timestamp", "1 hour").alias("time"))
.aggExpr("sum(profit) AS profit")
)
Odstranění duplicitních dat streamovaných záznamů
Strukturované streamování má přesně jednou záruku zpracování, ale automaticky neduplikuje záznamy ze zdrojů dat. Protože například mnoho front zpráv má alespoň jednou záruku, měly by se při čtení z jedné z těchto front zpráv očekávat duplicitní záznamy. Funkci můžete použít dropDuplicatesWithinWatermark()
k odstranění duplicitních záznamů u libovolného zadaného pole, odebrání duplicit ze streamu i v případě, že se některá pole liší (například čas události nebo čas příjezdu). Chcete-li použít dropDuplicatesWithinWatermark()
funkci, musíte zadat vodoznak. Všechna duplicitní data, která přicházejí v časovém rozsahu určeném vodoznakem, se zahodí.
Seřazená data jsou důležitá, protože data mimo pořadí způsobí nesprávné přeskakování hodnoty meze. Když přijdou starší data, považují se za opožděná a vyřazená. withEventTimeOrder
Pomocí možnosti můžete zpracovat počáteční snímek v pořadí na základě časového razítka zadaného ve vodoznaku. Možnost withEventTimeOrder
lze deklarovat v kódu definujícím datovou sadu nebo v nastavení kanálu pomocí spark.databricks.delta.withEventTimeOrder.enabled
. Příklad:
{
"spark_conf": {
"spark.databricks.delta.withEventTimeOrder.enabled": "true"
}
}
Poznámka:
Tato withEventTimeOrder
možnost se podporuje jenom v Pythonu.
V následujícím příkladu se data zpracovávají seřazená podle clickTimestamp
záznamů a záznamy přicházející do 5 sekund od sebe, které obsahují duplicitní userId
a clickAdId
sloupce, se zahodí.
clicksDedupDf = (
spark.readStream.table
.option("withEventTimeOrder", "true")
.table("LIVE.rawClicks")
.withWatermark("clickTimestamp", "5 seconds")
.dropDuplicatesWithinWatermark(["userId", "clickAdId"]))
Optimalizace konfigurace kanálu pro stavové zpracování
Aby se zabránilo problémům v produkčním prostředí a nadměrné latenci, doporučuje Databricks povolit správu stavu založené na RocksDB pro zpracování stavových datových proudů, zejména pokud zpracování vyžaduje úsporu velkého zprostředkujícího stavu.
Bezserverové kanály automaticky spravují konfigurace úložiště stavu.
Správu stavu na základě RocksDB můžete povolit nastavením následující konfigurace před nasazením kanálu:
{
"configuration": {
"spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
}
}
Další informace o úložišti stavů RocksDB, včetně doporučení konfigurace pro RocksDB, najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.