Sdílet prostřednictvím


Optimalizace stavového zpracování v rozdílových živých tabulkách pomocí vodoznaků

Pokud chcete efektivně spravovat data uložená ve stavu, použijte vodoznaky při provádění zpracování stavových datových proudů v rozdílových živých tabulkách, včetně agregací, spojení a odstranění duplicitních dat. Tento článek popisuje, jak používat vodoznaky v dotazech Delta Live Tables a obsahuje příklady doporučených operací.

Poznámka:

Aby se zajistilo, že dotazy, které provádějí agregace, se zpracovávají přírůstkově a nejsou plně přepočítané při každé aktualizaci, musíte použít vodoznaky.

Co je vodoznak?

Při zpracování datových proudů je vodoznak funkcí Apache Sparku, která může definovat prahovou hodnotu založenou na čase pro zpracování dat při provádění stavových operací, jako jsou agregace. Příchozí data se zpracovávají, dokud nedosáhnete prahové hodnoty. V tomto okamžiku je časové období definované prahovou hodnotou uzavřeno. Vodoznaky se dají použít k zabránění problémům při zpracování dotazů, zejména při zpracování větších datových sad nebo dlouhotrvajícího zpracování. Tyto problémy můžou zahrnovat vysokou latenci při vytváření výsledků a dokonce i chyby typu nedostatek paměti (OOM) kvůli množství dat, která se během zpracování uchovávají ve stavu. Vzhledem k tomu, že streamovaná data jsou ze své podstaty neuspořádaná, vodoznaky také podporují správné výpočty operací, jako jsou agregace časových intervalů.

Další informace o používání vodoznaků při zpracování datových proudů najdete v tématu Vodoznaky ve strukturovaném streamování Apache Sparku a použití vodoznaků pro řízení prahových hodnot zpracování dat.

Jak definujete vodoznak?

Vodoznak definujete tak, že zadáte pole časového razítka a hodnotu představující prahovou hodnotu času pro pozdní doručení dat . Data se považují za opožděná, pokud dorazí po definované prahové hodnotě času. Pokud je například prahová hodnota definována jako 10 minut, můžou se záznamy přicházející po 10minutové prahové hodnotě vynechat.

Vzhledem k tomu, že záznamy, které přicházejí po definované prahové hodnotě, mohou být vyřazeny, je důležité vybrat prahovou hodnotu, která splňuje vaše latence a požadavky na správnost. Pokud zvolíte menší prahovou hodnotu, vygenerují se záznamy dříve, ale také to znamená, že opožděné záznamy budou pravděpodobně vynechány. Větší prahová hodnota znamená delší čekání, ale možná větší úplnost dat. Kvůli větší velikosti stavu může větší prahová hodnota vyžadovat také další výpočetní prostředky. Vzhledem k tomu, že prahová hodnota závisí na požadavcích na data a zpracování, je důležité určit optimální prahovou hodnotu testováním a monitorováním zpracování.

Pomocí funkce v Pythonu withWatermark() definujete vodoznak. V SQL použijte WATERMARK klauzuli k definování vodoznaku:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Použití vodoznaků se spojeními stream-stream

U spojení stream-stream musíte definovat vodoznak na obou stranách spojení a klauzuli časového intervalu. Vzhledem k tomu, že každý zdroj spojení obsahuje neúplné zobrazení dat, je nutné, aby klauzule časového intervalu řekla modulu streamování, když není možné provést žádné další shody. Klauzule časového intervalu musí používat stejná pole, která slouží k definování vodoznaků.

Vzhledem k tomu, že každý datový proud vyžaduje pro vodoznaky různé prahové hodnoty, nemusí mít datové proudy stejné prahové hodnoty. Aby se zabránilo chybějícím datům, modul streamování udržuje jeden globální vodoznak založený na nejpomalejším datovém proudu.

Následující příklad spojí stream reklamních impresí a stream uživatelů klikne na reklamy. V tomto příkladu se kliknutí musí objevit do 3 minut od zobrazení. Po uplynutí 3minutového časového intervalu se řádky ze stavu, který už nelze shodovat, zahodí.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Provádění agregací s okny s vodoznaky

Běžnou stavovou operací streamovaných dat je agregace s okny. Agregace s okny jsou podobné seskupeným agregacím s tím rozdílem, že agregační hodnoty se vrátí pro sadu řádků, které jsou součástí definovaného okna.

Okno lze definovat jako určitou délku a operaci agregace lze provést na všech řádcích, které jsou součástí tohoto okna. Streamování Sparku podporuje tři typy oken:

  • Přeskakující (pevná) okna: Řada nepřekrývajících se a souvislých časových intervalů s pevnou velikostí. Vstupní záznam patří pouze do jednoho okna.
  • Posuvná okna: Podobně jako přeskakující okna jsou posuvná okna pevná, ale okna se můžou překrývat a záznam může spadat do více oken.

Když data dorazí na konec okna a délku vodoznaku, nebudou pro okno přijata žádná nová data, vygeneruje se výsledek agregace a stav okna se zahodí.

Následující příklad vypočítá součet impresí každých 5 minut pomocí pevného okna. V tomto příkladu klauzule select používá alias impressions_windowa pak samotné okno je definováno jako součást GROUP BY klauzule. Okno musí být založené na stejném sloupci časového razítka jako vodoznak, což je sloupec v tomto příkladu clickTimestamp .

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Podobný příklad v Pythonu pro výpočet zisku v hodinových pevných oknech:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Odstranění duplicitních dat streamovaných záznamů

Strukturované streamování má přesně jednou záruku zpracování, ale automaticky neduplikuje záznamy ze zdrojů dat. Protože například mnoho front zpráv má alespoň jednou záruku, měly by se při čtení z jedné z těchto front zpráv očekávat duplicitní záznamy. Funkci můžete použít dropDuplicatesWithinWatermark() k odstranění duplicitních záznamů u libovolného zadaného pole, odebrání duplicit ze streamu i v případě, že se některá pole liší (například čas události nebo čas příjezdu). Chcete-li použít dropDuplicatesWithinWatermark() funkci, musíte zadat vodoznak. Všechna duplicitní data, která přicházejí v časovém rozsahu určeném vodoznakem, se zahodí.

Seřazená data jsou důležitá, protože data mimo pořadí způsobí nesprávné přeskakování hodnoty meze. Když přijdou starší data, považují se za opožděná a vyřazená. withEventTimeOrder Pomocí možnosti můžete zpracovat počáteční snímek v pořadí na základě časového razítka zadaného ve vodoznaku. Možnost withEventTimeOrder lze deklarovat v kódu definujícím datovou sadu nebo v nastavení kanálu pomocí spark.databricks.delta.withEventTimeOrder.enabled. Příklad:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Poznámka:

Tato withEventTimeOrder možnost se podporuje jenom v Pythonu.

V následujícím příkladu se data zpracovávají seřazená podle clickTimestampzáznamů a záznamy přicházející do 5 sekund od sebe, které obsahují duplicitní userId a clickAdId sloupce, se zahodí.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optimalizace konfigurace kanálu pro stavové zpracování

Aby se zabránilo problémům v produkčním prostředí a nadměrné latenci, doporučuje Databricks povolit správu stavu založené na RocksDB pro zpracování stavových datových proudů, zejména pokud zpracování vyžaduje úsporu velkého zprostředkujícího stavu.

Bezserverové kanály automaticky spravují konfigurace úložiště stavu.

Správu stavu na základě RocksDB můžete povolit nastavením následující konfigurace před nasazením kanálu:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Další informace o úložišti stavů RocksDB, včetně doporučení konfigurace pro RocksDB, najdete v tématu Konfigurace úložiště stavů RocksDB v Azure Databricks.