Sdílet prostřednictvím


Co je stavové streamování?

Stavový dotaz strukturovaného streamování vyžaduje přírůstkové aktualizace průběžných informací o stavu, zatímco bezstavový dotaz strukturovaného streamování sleduje pouze informace o tom, které řádky byly zpracovány ze zdroje do jímky.

Stavové operace zahrnují agregaci streamování, streamování dropDuplicates, spojení datových proudů mapGroupsWithStatea flatMapGroupsWithState.

Informace o přechodném stavu vyžadované pro stavové dotazy strukturovaného streamování můžou vést k neočekávaným problémům s latencí a produkčním prostředím, pokud nejsou správně nakonfigurované.

Ve službě Databricks Runtime 13.3 LTS a novějších můžete povolit kontrolní body protokolu změn pomocí RocksDB a snížit dobu trvání kontrolního bodu a kompletní latenci pro úlohy strukturovaného streamování. Databricks doporučuje zapnout kontrolní body protokolu změn pro všechny stavové dotazy strukturovaného streamování. Viz Povolení vytváření kontrolních bodů protokolu změn.

Optimalizace stavových dotazů strukturovaného streamování

Správa průběžných informací o stavových dotazech strukturovaného streamování může pomoct zabránit neočekávaným problémům s latencí a produkčním prostředím.

Databricks doporučuje:

  • Používejte výpočetní instance jako pracovní procesy.
  • Nastavte počet oddílů náhodného prohazování na 1–2krát počet jader v clusteru.
  • Nastavte v SparkSession konfiguraci spark.sql.streaming.noDataMicroBatches.enabled na false. Tím zabráníte streamovacímu mikrodávkovému modulu zpracovávat mikrodávkové dávky, které neobsahují data. Všimněte si také, že nastavení této konfigurace na false může vést ke stavovým operacím, které využívají meze nebo časové limity zpracování, aby nezískuly výstup dat, dokud se nová data nedostanou místo okamžitého doručení.

Databricks doporučuje používat RocksDB s kontrolním bodem protokolu změn ke správě stavu stavových datových proudů. Viz Konfigurace úložiště stavů RocksDB v Azure Databricks.

Poznámka:

Schéma správy stavu nelze mezi restartováními dotazů změnit. To znamená, že pokud byl dotaz spuštěn s výchozí správou, nemůže se změnit bez spuštění dotazu od začátku s novým umístěním kontrolního bodu.

Práce s několika stavovými operátory ve strukturovaném streamování

V Databricks Runtime 13.3 LTS a novějších nabízí Azure Databricks pokročilou podporu stavových operátorů v úlohách strukturovaného streamování. Nyní můžete zřetězovat několik stavových operátorů, což znamená, že můžete předávat výstup operace, jako je okenní agregace, do jiné stavové operace, jako je sloučení.

Následující příklady ukazují několik vzorů, které můžete použít.

Důležité

Při práci s několika stavovými operátory existují následující omezení:

  • FlatMapGroupWithState není podporováno.
  • Podporuje se pouze režim výstupu připojení.

Agregace zřetězených časových intervalů

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agregace časového okna ve dvou různých datových proudech následovaná spojením časového okna datových proudů.

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Spojení datových proudů podle časového intervalu následované agregací v časovém okně

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Vyrovnávání stavu pro strukturované streamování

Vyrovnávání stavu je ve výchozím nastavení povoleno pro všechny streamingové úlohy v Delta Live Tables. Ve službě Databricks Runtime 11.3 LTS a novějších můžete v konfiguraci clusteru Spark nastavit následující možnost konfigurace, abyste povolili vyrovnávání stavu:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Vyrovnávání stavu přináší stavové kanály strukturovaného streamování, které procházejí událostmi změny velikosti clusteru. Bezstavové operace streamování nemají prospěch bez ohledu na změnu velikosti clusteru.

Poznámka:

Automatické škálování výpočetních prostředků má omezení vertikálního snížení kapacity clusteru pro úlohy strukturovaného streamování. Databricks doporučuje používat tabulky Delta Live s vylepšeným automatickým škálováním pro úlohy streamování. Viz Optimalizace využití clusteru kanálů Delta Live Tables s vylepšeným automatickým škálováním.

Změna velikosti událostí clusteru způsobí opětovné vyrovnávání stavu, který se aktivuje. Při rebalancování událostí můžou mikrodávkové dávky mít vyšší latenci, protože se stav načítá z cloudového úložiště do nových exekutorů.

Zadání počátečního stavu pro mapGroupsWithState

Můžete zadat uživatelem definovaný počáteční stav pro stavové zpracování strukturovaného streamování pomocí flatMapGroupsWithStatenebo mapGroupsWithState. To vám umožní vyhnout se opětovnému zpracování dat při spuštění stavového datového proudu bez platného kontrolního bodu.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Příklad případu použití, který určuje počáteční stav operátoru flatMapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Příklad případu použití, který určuje počáteční stav operátoru mapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testovat funkci mapGroupsWithState update

Rozhraní API TestGroupState umožňuje otestovat funkci aktualizace stavu používanou pro Dataset.groupByKey(...).mapGroupsWithState(...) a Dataset.groupByKey(...).flatMapGroupsWithState(...).

Funkce aktualizace stavu přebírá předchozí stav jako vstup pomocí objektu typu GroupState. Viz referenční dokumentace k Apache Spark GroupState. Příklad:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}