Udostępnij za pośrednictwem


Co to jest przesyłanie strumieniowe stanowe?

Stanowe zapytanie przesyłania strumieniowego ze strukturą wymaga przyrostowych aktualizacji informacji o stanie pośrednim, natomiast bezstanowe zapytanie przesyłania strumieniowego ze strukturą śledzi tylko informacje o tym, które wiersze zostały przetworzone ze źródła do ujścia.

Operacje stanowe obejmują agregację przesyłania strumieniowego, sprzężenia dropDuplicatesstrumienia, mapGroupsWithStatei flatMapGroupsWithState.

Informacje o stanie pośrednim wymagane dla stanowych zapytań przesyłania strumieniowego ze strukturą mogą prowadzić do nieoczekiwanych opóźnień i problemów produkcyjnych, jeśli nie zostały prawidłowo skonfigurowane.

W środowisku Databricks Runtime 13.3 LTS i nowszym można włączyć tworzenie punktów kontrolnych dziennika zmian za pomocą bazy danych RocksDB, aby zmniejszyć czas trwania punktu kontrolnego i kompleksowe opóźnienie obciążeń przesyłania strumieniowego ze strukturą. Usługa Databricks zaleca włączenie punktów kontrolnych dziennika zmian dla wszystkich zapytań stanowych przesyłania strumieniowego ze strukturą. Zobacz Włączanie tworzenia punktów kontrolnych dziennika zmian.

Optymalizowanie stanowych zapytań przesyłania strumieniowego ze strukturą

Zarządzanie informacjami o stanie pośrednim zapytań przesyłania strumieniowego ze strukturą może pomóc w zapobieganiu nieoczekiwanym opóźnieniu i problemom produkcyjnym.

Usługa Databricks zaleca:

  • Użyj wystąpień zoptymalizowanych pod kątem obliczeń jako procesów roboczych.
  • Ustaw liczbę partycji mieszania na 1–2 razy więcej rdzeni w klastrze.
  • Ustaw konfigurację spark.sql.streaming.noDataMicroBatches.enabled na false wartość w usłudze SparkSession. Zapobiega to przetwarzaniu mikrosadowych mikrosadów, które nie zawierają danych. Należy również pamiętać false , że ustawienie tej konfiguracji może spowodować operacje stanowe, które wykorzystują limity czasu limitu czasu przetwarzania, aby nie pobierać danych wyjściowych do momentu odebrania nowych danych zamiast natychmiast.

Usługa Databricks zaleca używanie bazy danych RocksDB z punktami kontrolnymi dziennika zmian w celu zarządzania stanem strumieni stanowych. Zobacz Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks.

Uwaga

Nie można zmienić schematu zarządzania stanem między ponownymi uruchomieniami zapytań. Oznacza to, że jeśli zapytanie zostało uruchomione z domyślnym zarządzaniem, nie można go zmienić bez uruchamiania zapytania od podstaw przy użyciu nowej lokalizacji punktu kontrolnego.

Praca z wieloma operatorami stanowymi w strumieniu ze strukturą

W środowisku Databricks Runtime 13.3 LTS i nowszym usługa Azure Databricks oferuje zaawansowaną obsługę operatorów stanowych w obciążeniach przesyłania strumieniowego ze strukturą. Teraz można połączyć wiele operatorów stanowych, co oznacza, że można podać dane wyjściowe operacji, takie jak agregacja okienna do innej operacji stanowej, takiej jak sprzężenie.

W poniższych przykładach pokazano kilka wzorców, których można użyć.

Ważne

Podczas pracy z wieloma operatorami stanowymi istnieją następujące ograniczenia:

  • FlatMapGroupWithState nie jest obsługiwana.
  • Obsługiwany jest tylko tryb wyjściowy dołączania.

Agregacja przedziału czasu łańcuchowego

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregacja przedziału czasu w dwóch różnych strumieniach, a następnie sprzężenia okna strumienia

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Sprzężenia interwału czasu strumienia, po którym następuje agregacja przedziału czasu

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Ponowne równoważenie stanu dla przesyłania strumieniowego ze strukturą

Ponowne równoważenie stanu jest domyślnie włączone dla wszystkich obciążeń przesyłania strumieniowego w tabelach delta Live Tables. W środowisku Databricks Runtime 11.3 LTS i nowszym można ustawić następującą opcję konfiguracji w konfiguracji klastra Spark, aby włączyć ponowne równoważenie stanu:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Ponowne równoważenie stanu zapewnia stanowe potoki przesyłania strumieniowego ze strukturą, które przechodzą zdarzenia zmiany rozmiaru klastra. Operacje przesyłania strumieniowego bezstanowego nie przynoszą korzyści niezależnie od zmiany rozmiarów klastra.

Uwaga

Skalowanie automatyczne obliczeń ma ograniczenia skalowania w dół rozmiaru klastra dla obciążeń przesyłania strumieniowego ze strukturą. Usługa Databricks zaleca używanie tabel delta live z rozszerzonym skalowaniem automatycznym na potrzeby obciążeń przesyłania strumieniowego. Zobacz Optymalizowanie wykorzystania klastra potoków tabel na żywo delty przy użyciu rozszerzonego skalowania automatycznego.

Zdarzenia zmiany rozmiaru klastra powodują ponowne równoważenie stanu do wyzwolenia. Podczas ponownego równoważenia zdarzeń mikrosady mogą mieć większe opóźnienie, ponieważ stan jest ładowany z magazynu w chmurze do nowych funkcji wykonawczych.

Określ stan początkowy dla mapGroupsWithState

Możesz określić stan początkowy zdefiniowany przez użytkownika dla przetwarzania stanowego przesyłania strumieniowego ze strukturą przy użyciu polecenia flatMapGroupsWithStatelub mapGroupsWithState. Dzięki temu można uniknąć ponownego przetwarzania danych podczas uruchamiania strumienia stanowego bez prawidłowego punktu kontrolnego.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Przykładowy przypadek użycia określający początkowy stan flatMapGroupsWithState operatora:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Przykładowy przypadek użycia określający początkowy stan mapGroupsWithState operatora:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

mapGroupsWithState Testowanie funkcji update

Interfejs TestGroupState API umożliwia przetestowanie funkcji aktualizacji stanu używanej w systemach Dataset.groupByKey(...).mapGroupsWithState(...) i Dataset.groupByKey(...).flatMapGroupsWithState(...).

Funkcja aktualizacji stanu przyjmuje poprzedni stan jako dane wejściowe przy użyciu obiektu typu GroupState. Zapoznaj się z dokumentacją referencyjną platformy Apache Spark GroupState. Na przykład:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}