Udostępnij za pośrednictwem


Stosowanie znaków wodnych w celu kontrolowania progów przetwarzania danych

W tym artykule przedstawiono podstawowe pojęcia związane z znakami wodnymi i przedstawiono zalecenia dotyczące używania znaków wodnych w typowych operacjach przesyłania strumieniowego stanowego. Należy zastosować znaki wodne do stanowych operacji przesyłania strumieniowego, aby uniknąć nieskończonego rozszerzania ilości danych przechowywanych w stanie, co może powodować problemy z pamięcią i zwiększać opóźnienia przetwarzania podczas długotrwałych operacji przesyłania strumieniowego.

Co to jest znak wodny?

Przesyłanie strumieniowe ze strukturą używa znaków wodnych do kontrolowania progu czasu kontynuowania przetwarzania aktualizacji dla danej jednostki stanu. Typowe przykłady jednostek stanu to:

  • Agregacje w przedziale czasu.
  • Unikatowe klucze w sprzężeniach między dwoma strumieniami.

Podczas deklarowania znaku wodnego należy określić pole znacznika czasu i próg limitu czasu w ramce danych przesyłania strumieniowego. Po nadejściu nowych danych menedżer stanu śledzi najnowszy znacznik czasu w określonym polu i przetwarza wszystkie rekordy w ramach progu opóźnienia.

Poniższy przykład stosuje próg limitu 10 minut do liczby okien:

from pyspark.sql.functions import window

(df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
)

W tym przykładzie:

  • Kolumna event_time służy do definiowania 10-minutowego limitu i 5-minutowego okna wirowania.
  • Liczba jest zbierana dla każdego id obserwowanego dla każdego nienakładających się okien 5 minut.
  • Informacje o stanie są przechowywane dla każdej liczby do końca okna jest 10 minut starsze niż najnowsze zaobserwowane event_time.

Ważne

Progi znaku wodnego gwarantują, że rekordy przychodzące do określonego progu są przetwarzane zgodnie z semantyka zdefiniowanego zapytania. Późne przybycie rekordów przybywających poza określony próg może być nadal przetwarzane przy użyciu metryk zapytań, ale nie jest to gwarantowane.

Jak znaki wodne wpływają na czas przetwarzania i przepływność?

Znaki wodne współdziałają z trybami danych wyjściowych, aby kontrolować, kiedy dane są zapisywane w ujściu. Ze względu na to, że znaki wodne zmniejszają łączną ilość informacji o stanie, które mają być przetwarzane, efektywne użycie znaków wodnych jest niezbędne do wydajnej przepływności przesyłania strumieniowego stanowego.

Uwaga

Nie wszystkie tryby wyjściowe są obsługiwane dla wszystkich operacji stanowych.

Znaki wodne i tryb wyjściowy dla agregacji okiennych

Poniższa tabela zawiera szczegółowe informacje dotyczące przetwarzania zapytań z agregacją na sygnaturze czasowej ze zdefiniowanym znakiem wodnym:

Tryb danych wyjściowych Zachowanie
Dołączanie Wiersze są zapisywane w tabeli docelowej po osiągnięciu progu limitu. Wszystkie zapisy są opóźnione na podstawie progu opóźnienia. Stary stan agregacji jest porzucany po osiągnięciu progu.
Zaktualizuj Wiersze są zapisywane w tabeli docelowej w miarę obliczania wyników i można je aktualizować i zastępować w miarę nadejścia nowych danych. Stary stan agregacji jest porzucany po osiągnięciu progu.
Ukończ Stan agregacji nie jest porzucony. Tabela docelowa zostanie przepisana przy użyciu każdego wyzwalacza.

Znaki wodne i dane wyjściowe dla sprzężeń strumienia

Sprzężenia między wieloma strumieniami obsługują tylko tryb dołączania, a dopasowane rekordy są zapisywane w każdej partii, w której są odnajdywane. W przypadku sprzężeń wewnętrznych usługa Databricks zaleca ustawienie progu limitu dla każdego źródła danych przesyłania strumieniowego. Umożliwia to odrzucenie informacji o stanie dla starych rekordów. Bez znaków wodnych przesyłanie strumieniowe ze strukturą próbuje połączyć każdy klucz z obu stron sprzężenia z każdym wyzwalaczem.

Przesyłanie strumieniowe ze strukturą ma specjalne semantyki do obsługi sprzężeń zewnętrznych. Znak wodny jest obowiązkowy dla sprzężeń zewnętrznych, ponieważ wskazuje, kiedy klucz musi być zapisany z wartością null po przejściu niedopasowanym. Należy pamiętać, że chociaż sprzężenia zewnętrzne mogą być przydatne do rejestrowania rekordów, które nigdy nie są zgodne podczas przetwarzania danych, ponieważ sprzężenia zapisują się tylko w tabelach jako operacje dołączania, brakujące dane nie są rejestrowane do momentu osiągnięcia progu opóźnienia.

Kontrolowanie progu opóźnionych danych przy użyciu wielu zasad limitu w strumieniu ze strukturą

Podczas pracy z wieloma danymi wejściowymi przesyłania strumieniowego ze strukturą można ustawić wiele znaków wodnych, aby kontrolować progi tolerancji dla danych przychodzących późno. Konfigurowanie znaków wodnych pozwala kontrolować informacje o stanie i wpływać na opóźnienie.

Zapytanie przesyłane strumieniowo może mieć wiele strumieni wejściowych, które są połączone lub połączone. Każdy ze strumieni wejściowych może mieć inny próg opóźnionych danych, które muszą być tolerowane dla operacji stanowych. Określ te progi przy użyciu withWatermarks("eventTime", delay) dla każdego strumieni wejściowych. Poniżej przedstawiono przykładowe zapytanie ze sprzężeniami strumienia strumienia.

val inputStream1 = ...      // delays up to 1 hour
val inputStream2 = ...      // delays up to 2 hours

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

Podczas uruchamiania zapytania przesyłanie strumieniowe ze strukturą indywidualnie śledzi maksymalny czas zdarzenia widoczny w każdym strumieniu wejściowym, oblicza znaki wodne na podstawie odpowiedniego opóźnienia i wybiera jeden globalny znak wodny z nimi do użycia na potrzeby operacji stanowych. Domyślnie wartość minimalna jest wybierana jako globalny znak wodny, ponieważ gwarantuje, że żadne dane nie zostaną przypadkowo porzucone jako za późno, jeśli jeden ze strumieni spadnie za drugim (na przykład jeden ze strumieni przestanie odbierać dane z powodu niepowodzeń nadrzędnych). Innymi słowy, globalny znak wodny bezpiecznie porusza się w tempie najwolniejszego strumienia, a dane wyjściowe zapytania są odpowiednio opóźnione.

Jeśli chcesz uzyskać szybsze wyniki, możesz ustawić wiele zasad limitu, aby wybrać maksymalną wartość jako globalny znak wodny, ustawiając konfigurację spark.sql.streaming.multipleWatermarkPolicy SQL na max wartość (wartość domyślna to min). Dzięki temu globalny znak wodny porusza się w tempie najszybszego strumienia. Jednak ta konfiguracja usuwa dane z najwolniejszych strumieni. W związku z tym usługa Databricks zaleca, aby używać tej konfiguracji rozsądnie.

Upuszczanie duplikatów w znaku wodnym

W środowisku Databricks Runtime 13.3 LTS i nowszym można deduplikować rekordy w ramach progu limitu limitu, używając unikatowego identyfikatora.

Przesyłanie strumieniowe ze strukturą zapewnia gwarancje dokładnie jednokrotnego przetwarzania, ale nie automatycznie deduplikuje rekordów ze źródeł danych. Za pomocą dropDuplicatesWithinWatermark funkcji deduplikacji rekordów w dowolnym określonym polu można usunąć duplikaty ze strumienia, nawet jeśli niektóre pola różnią się (na przykład czas zdarzenia lub godzina przybycia).

Zduplikowane rekordy, które docierają do określonego limitu, muszą zostać usunięte. Ta gwarancja jest ścisła tylko w jednym kierunku, a zduplikowane rekordy, które docierają poza określony próg, również mogą zostać usunięte. Należy ustawić próg opóźnienia limitu dłuższego niż maksymalna różnica sygnatury czasowej między zduplikowanymi zdarzeniami, aby usunąć wszystkie duplikaty.

Aby użyć dropDuplicatesWithinWatermark metody , należy określić znak wodny, jak w poniższym przykładzie:

Python

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
(streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])
)

Scala

val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark(["guid"])