Udostępnij za pośrednictwem


Co to jest przesyłanie strumieniowe stanowe?

Stanowe zapytanie przesyłania strumieniowego ze strukturą wymaga przyrostowych aktualizacji informacji o stanie pośrednim, natomiast bezstanowe zapytanie przesyłania strumieniowego ze strukturą śledzi tylko informacje o tym, które wiersze zostały przetworzone ze źródła do ujścia.

Operacje stanowe obejmują agregację przesyłania strumieniowego, przesyłanie strumieniowe dropDuplicates, sprzężenia strumienia i niestandardowe aplikacje stanowe.

Informacje o stanie pośrednim wymagane dla stanowych zapytań przesyłania strumieniowego ze strukturą mogą prowadzić do nieoczekiwanych opóźnień i problemów produkcyjnych w przypadku nieprawidłowej konfiguracji.

W środowisku Databricks Runtime 13.3 LTS i nowszym można włączyć tworzenie punktów kontrolnych dziennika zmian za pomocą bazy danych RocksDB, aby zmniejszyć czas trwania punktu kontrolnego i kompleksowe opóźnienie obciążeń przesyłania strumieniowego ze strukturą. Usługa Databricks zaleca włączenie punktów kontrolnych dziennika zmian dla wszystkich zapytań stanowych przesyłania strumieniowego ze strukturą. Zobacz Zapisywanie punktów kontrolnych dziennika zmian.

Optymalizacja stanowych zapytań Structured Streaming

Zarządzanie informacjami o stanie pośrednim stanowych zapytań Structured Streaming może pomóc w zapobieganiu nieoczekiwanym opóźnieniom i problemom produkcyjnym.

Usługa Databricks zaleca:

  • Użyj wystąpień zoptymalizowanych pod kątem obliczeń jako procesów roboczych.
  • Ustaw liczbę partycji mieszania na 1–2 razy liczbę rdzeni w klastrze.
  • Ustaw konfigurację spark.sql.streaming.noDataMicroBatches.enabled na false w usłudze SparkSession. Zapobiega to przetwarzaniu mikropakietów, które nie zawierają danych. Należy również pamiętać, że ustawienie tej konfiguracji na false może prowadzić do sytuacji, w których operacje stanowe używające znaków wodnych lub limitów czasu przetwarzania nie będą generować danych wyjściowych, dopóki nie pojawią się nowe dane, zamiast robić to natychmiast.

Databricks zaleca używanie RocksDB z dziennikiem zmian jako punktami kontrolnymi do zarządzania stanem strumieni stanowych. Zobacz Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks.

Uwaga

Nie można zmienić schematu zarządzania stanem między ponownymi uruchomieniami zapytań. Jeśli zapytanie zostało uruchomione z domyślnym zarządzaniem, należy uruchomić je od podstaw przy użyciu nowej lokalizacji punktu kontrolnego, aby zmienić magazyn stanów.

Praca z różnymi operatorami stanowymi w strumieniowaniu strukturalnym

W środowisku Databricks Runtime 13.3 LTS i nowszym usługa Azure Databricks oferuje zaawansowaną obsługę operatorów stanowych w obciążeniach przesyłania strumieniowego ze strukturą. Teraz można łączyć wiele operatorów stanowych razem, co oznacza, że możesz przekazać wynik operacji, takich jak agregacja okienna, do innej operacji stanowej, takiej jak sprzężenie.

W środowisku Databricks Runtime 16.2 lub nowszym można użyć transformWithState w obciążeniach z wieloma operatorami stanowymi. Zobacz Zbuduj niestandardową aplikację stanową.

W poniższych przykładach pokazano kilka wzorców, których można użyć.

Ważne

Podczas pracy z wieloma operatorami stanowymi istnieją następujące ograniczenia:

  • Starsze niestandardowe operatory stanowe (FlatMapGroupWithState i applyInPandasWithState nie są obsługiwane.
  • Obsługiwany jest tylko tryb wyjściowy dołączania.

Agregacja łańcuchowa okien czasowych

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregacja przedziału czasu w dwóch różnych strumieniach, po której następuje łączenie okna strumień-strumień

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Łączenie strumień-strumień w przedziale czasowym, po którym następuje agregacja w oknie czasowym

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Ponowne równoważenie stanu dla przesyłania strumieniowego ze strukturą

Rebalans stanu jest domyślnie włączony dla wszystkich obciążeń przesyłania strumieniowego w DLT. W środowisku Databricks Runtime 11.3 LTS i nowszym można ustawić następującą opcję konfiguracji w konfiguracji klastra Spark, aby włączyć ponowne równoważenie stanu:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Ponowne równoważenie stanu przynosi korzyści stanowym potokom o strukturze przesyłania strumieniowego, które podlegają zdarzeniom zmiany rozmiaru klastra. Bezstanowe operacje przesyłania strumieniowego nie przynoszą korzyści, niezależnie od zmiany rozmiarów klastra.

Uwaga

Automatyczne skalowanie obliczeniowe ma ograniczenia dotyczące zmniejszania rozmiaru klastrów dla obciążeń związanych z przesyłaniem strumieniowym w strukturze. Databricks zaleca używanie DLT z rozszerzonym autoskalowaniem dla obciążeń przesyłania strumieniowego. Zobacz Optymalizowanie wykorzystania klastra potoków DLT za pomocą rozszerzonego skalowania automatycznego.

Zdarzenia zmiany rozmiaru klastra wywołują ponowne równoważenie stanu. Mikrosady mogą mieć większe opóźnienie podczas ponownego równoważenia zdarzeń, ponieważ stan jest ładowany z magazynu w chmurze do nowych funkcji wykonawczych.