Udostępnij za pośrednictwem


Optymalizowanie przetwarzania stanowego w tabelach delta live przy użyciu znaków wodnych

Aby skutecznie zarządzać danymi przechowywanymi w stanie, należy używać znaków wodnych podczas wykonywania stanowego przetwarzania strumieniowego w tabelach Delta Live Tables, w tym agregacji, sprzężeń i deduplikacji. W tym artykule opisano sposób używania znaków wodnych w zapytaniach delta Live Tables i zawiera przykłady zalecanych operacji.

Uwaga

Aby zapewnić, że zapytania wykonujące agregacje są przetwarzane przyrostowo i nie są w pełni ponownie skompilowane przy każdej aktualizacji, należy użyć znaków wodnych.

Co to jest znak wodny?

W przypadku przetwarzania strumienia znak wodny to funkcja platformy Apache Spark, która umożliwia zdefiniowanie progu opartego na czasie przetwarzania danych podczas wykonywania operacji stanowych, takich jak agregacje. Dane przychodzące są przetwarzane do momentu osiągnięcia progu, w którym przedział czasu zdefiniowany przez próg zostanie zamknięty. Znaki wodne mogą służyć do unikania problemów podczas przetwarzania zapytań, głównie podczas przetwarzania większych zestawów danych lub długotrwałego przetwarzania. Te problemy mogą obejmować duże opóźnienie w tworzeniu wyników, a nawet błędów braku pamięci (OOM) ze względu na ilość danych przechowywanych w stanie podczas przetwarzania. Ponieważ dane przesyłane strumieniowo są z natury nieurządkowane, znaki wodne obsługują również prawidłowe obliczanie operacji, takich jak agregacje okien czasowych.

Aby dowiedzieć się więcej na temat używania znaków wodnych w przetwarzaniu strumienia, zobacz Watermarking in Apache Spark Structured Streaming (Przesyłanie strumieniowe ze strukturą platformy Apache Spark) i Apply watermarks to control data processing thresholds (Stosowanie znaków wodnych w celu kontrolowania progów przetwarzania danych).

Jak zdefiniować znak wodny?

Możesz zdefiniować znak wodny, określając pole znacznika czasu i wartość reprezentującą próg czasu dla danych opóźnionych do odebrania. Dane są uznawane za opóźnione, jeśli docierają po zdefiniowanym progu czasu. Jeśli na przykład próg jest zdefiniowany jako 10 minut, rekordy przychodzące po progu 10 minut mogą zostać usunięte.

Ponieważ rekordy dostarczane po zdefiniowanym progu mogą zostać porzucone, wybranie progu spełniającego wymagania dotyczące opóźnień i poprawności jest ważne. Wybranie mniejszego progu powoduje, że rekordy są emitowane wcześniej, ale także oznacza, że późne rekordy mogą zostać porzucone. Większy próg oznacza dłuższe oczekiwanie, ale prawdopodobnie większą kompletność danych. Ze względu na większy rozmiar stanu większy próg może również wymagać dodatkowych zasobów obliczeniowych. Ponieważ wartość progowa zależy od wymagań dotyczących danych i przetwarzania, testowanie i monitorowanie przetwarzania jest ważne, aby określić optymalny próg.

Funkcja w języku Python służy withWatermark() do definiowania znaku wodnego. W języku SQL użyj klauzuli WATERMARK , aby zdefiniować znak wodny:

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

Używanie znaków wodnych ze sprzężeniami strumienia strumienia

W przypadku sprzężeń strumienia należy zdefiniować znak wodny po obu stronach sprzężenia i klauzulę interwału czasu. Ponieważ każde źródło sprzężenia ma niekompletny widok danych, klauzula interwału czasu jest wymagana, aby poinformować aparat przesyłania strumieniowego, gdy nie można wykonać dalszych dopasowań. Klauzula interwału czasu musi używać tych samych pól używanych do definiowania znaków wodnych.

Ze względu na to, że każdy strumień wymaga różnych progów dla znaków wodnych, strumienie nie muszą mieć tych samych progów. Aby uniknąć brakujących danych, aparat przesyłania strumieniowego utrzymuje jeden globalny znak wodny na podstawie najwolniejszego strumienia.

Poniższy przykład łączy strumień wyświetleń reklam i strumień kliknięć użytkowników na reklamach. W tym przykładzie kliknięcie musi nastąpić w ciągu 3 minut od wyświetlenia. Po upływie 3-minutowego interwału czasu wiersze ze stanu, którego nie można już dopasować, zostaną porzucone.

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

Wykonywanie agregacji okiennych za pomocą znaków wodnych

Typową operacją stanową na danych przesyłanych strumieniowo jest agregacja okienna. Agregacje okienne są podobne do agregacji pogrupowanych, z tą różnicą, że wartości agregujące są zwracane dla zestawu wierszy będących częścią zdefiniowanego okna.

Okno można zdefiniować jako określoną długość, a operację agregacji można wykonać na wszystkich wierszach, które są częścią tego okna. Przesyłanie strumieniowe platformy Spark obsługuje trzy typy okien:

  • Okna stałoczasowe (stałe): szereg stałych rozmiarów, nienakładających się i ciągłych interwałów czasowych. Rekord wejściowy należy tylko do jednego okna.
  • Okna przesuwne: Podobnie jak okna wirujące, okna przesuwne są o stałym rozmiarze, ale okna mogą się nakładać, a rekord może znajdować się w wielu oknach.

Gdy dane docierają obok końca okna oraz długość znaku wodnego, żadne nowe dane nie są akceptowane dla okna, wynik agregacji jest emitowany, a stan okna zostanie porzucony.

Poniższy przykład oblicza sumę wyświetleń co 5 minut przy użyciu stałego okna. W tym przykładzie klauzula select używa aliasu impressions_window, a następnie samo okno jest definiowane jako część klauzuli GROUP BY . Okno musi być oparte na tej samej kolumnie znacznika czasu co znak wodny— kolumna clickTimestamp w tym przykładzie.

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Podobny przykład w języku Python do obliczania zysku w godzinach stałych okien:

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

Deduplikacja rekordów przesyłania strumieniowego

Przesyłanie strumieniowe ze strukturą ma dokładnie jednokrotne gwarancje przetwarzania, ale nie automatycznie deduplikuje rekordów ze źródeł danych. Na przykład ponieważ wiele kolejek komunikatów ma co najmniej raz gwarancje, podczas odczytu z jednej z tych kolejek komunikatów należy oczekiwać zduplikowanych rekordów. Za pomocą dropDuplicatesWithinWatermark() funkcji można deduplikować rekordy w dowolnym określonym polu, usuwając duplikaty ze strumienia, nawet jeśli niektóre pola różnią się (np. czas zdarzenia lub czas przybycia). Aby użyć dropDuplicatesWithinWatermark() funkcji, należy określić znak wodny. Wszystkie zduplikowane dane, które docierają do zakresu czasu określonego przez znak wodny, są porzucane.

Uporządkowane dane są ważne, ponieważ dane poza kolejnością powodują nieprawidłowe skoki wartości limitu. Następnie, gdy pojawią się starsze dane, są uznawane za opóźnione i porzucone. withEventTimeOrder Użyj opcji , aby przetworzyć początkową migawkę w kolejności na podstawie znacznika czasu określonego w znaku wodnym. Tę withEventTimeOrder opcję można zadeklarować w kodzie definiującym zestaw danych lub w ustawieniach potoku przy użyciu polecenia spark.databricks.delta.withEventTimeOrder.enabled. Na przykład:

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Uwaga

Ta opcja jest obsługiwana withEventTimeOrder tylko w języku Python.

W poniższym przykładzie dane są przetwarzane według clickTimestampparametrów , a rekordy przychodzące w ciągu 5 sekund od siebie, które zawierają zduplikowane userId kolumny i clickAdId są porzucane.

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

Optymalizowanie konfiguracji potoku pod kątem przetwarzania stanowego

Aby zapobiec problemom produkcyjnym i nadmiernemu opóźnieniu, usługa Databricks zaleca włączenie zarządzania stanem opartym na bazie bazy danych RocksDB na potrzeby przetwarzania strumienia stanowego, szczególnie jeśli przetwarzanie wymaga zaoszczędzenia dużej ilości stanu pośredniego.

Potoki bezserwerowe automatycznie zarządzają konfiguracjami magazynu stanów.

Zarządzanie stanem opartym na bazie bazy danych RocksDB można włączyć, ustawiając następującą konfigurację przed wdrożeniem potoku:

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

Aby dowiedzieć się więcej na temat magazynu stanów bazy danych RocksDB, w tym zaleceń dotyczących konfiguracji bazy danych RocksDB, zobacz Configure RocksDB state store on Azure Databricks (Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks).