Dela via


Vad är tillståndskänslig strömning?

En tillståndskänslig fråga för strukturerad direktuppspelning kräver inkrementella uppdateringar av information om mellanliggande tillstånd, medan en tillståndslös strukturerad direktuppspelningsfråga endast spårar information om vilka rader som har bearbetats från källan till mottagaren.

Tillståndskänsliga åtgärder omfattar strömningsaggregering, direktuppspelning dropDuplicates, stream-stream-kopplingar mapGroupsWithStateoch flatMapGroupsWithState.

Den mellanliggande tillståndsinformation som krävs för tillståndskänsliga frågor för strukturerad direktuppspelning kan leda till oväntade svarstider och produktionsproblem om de inte har konfigurerats korrekt.

I Databricks Runtime 13.3 LTS och senare kan du aktivera kontrollpunkter för ändringsloggar med RocksDB för att sänka varaktigheten för kontrollpunkter och svarstid från slutpunkt till slutpunkt för strukturerade strömningsarbetsbelastningar. Databricks rekommenderar att du aktiverar kontrollpunkter för ändringslogg för alla tillståndsfrågor i Structured Streaming. Se Aktivera kontrollpunkter för ändringsloggar.

Optimize tillståndskänsliga frågor för strukturerad direktuppspelning

Genom att hantera mellanliggande tillståndsinformation för tillståndskänsliga frågor för strukturerad direktuppspelning kan du förhindra oväntade svarstider och produktionsproblem.

Databricks rekommenderar:

  • Använd beräkningsoptimerade instanser som arbetare.
  • Sätt Set antalet shuffle-partitioner till 1–2 gånger så många som antalet kärnor i klustret.
  • Set konfigurationen spark.sql.streaming.noDataMicroBatches.enabled till false i SparkSession. Detta hindrar den strömmande mikrobatchmotorn från att bearbeta mikrobatch som inte innehåller data. Observera också att om du ställer in den här konfigurationen på false kan det leda till tillståndskänsliga åtgärder som utnyttjar vattenstämplar eller bearbetningstidstimeouter för att inte get datautdata förrän nya data tas emot i stället för omedelbart.

Databricks rekommenderar att du använder RocksDB med kontrollpunkter för ändringsloggar för att hantera tillståndet för tillståndskänsliga strömmar. Se Konfigurera RocksDB tillståndslagring i Azure Databricks.

Kommentar

Det går inte att ändra tillståndshanteringsschemat mellan omstarter av frågor. Om en fråga har startats med standardhanteringen kan den alltså inte ändras utan att frågan startas från grunden med en ny kontrollpunktsplats.

Arbeta med flera tillståndskänsliga operatorer i strukturerad direktuppspelning

I Databricks Runtime 13.3 LTS och senare erbjuder Azure Databricks avancerat stöd för tillståndskänsliga operatörer i strukturerade strömningsarbetsbelastningar. Nu kan du länka flera tillståndskänsliga operatorer, vilket innebär att du kan skicka utdata från en operation, till exempel en fönsterad aggregering, till en annan tillståndskänslig operation, till exempel en join.

I följande exempel visas flera mönster som du kan använda.

Viktigt!

Följande begränsningar finns när du arbetar med flera tillståndskänsliga operatorer:

  • FlatMapGroupWithState stöds inte.
  • Endast utdataläget för tillägg stöds.

Sammanlänkad tid window aggregering

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Tid window aggregering i två olika strömmar följt av stream-stream-windowjoin

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Tidsintervall för dataströmmen join följt av tid window sammansättning

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Ombalansering av tillstånd för strukturerad direktuppspelning

Tillståndsombalansering är aktiverat som standard för alla strömningsarbetsbelastningar i Delta Live Tables. I Databricks Runtime 11.3 LTS och senare kan du set följande konfigurationsalternativ i Spark-klusterkonfigurationen för att aktivera tillståndsombalansering:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Ombalansering av tillstånd gynnar tillståndskänsliga pipelines för strukturerad direktuppspelning som genomgår klusterändringshändelser. Tillståndslösa strömningsåtgärder gynnas inte, oavsett om klusterstorlekarna ändras.

Kommentar

Automatisk skalning av beräkning har begränsningar för att skala ned klusterstorleken för arbetsbelastningar med strukturerad direktuppspelning. Databricks rekommenderar att du använder Delta Live Tables med förbättrad automatisk skalning för strömningsarbetsbelastningar. Se Optimize klusterutnyttjandet för Delta Live Tables-pipelines med förbättrad automatisk skalning.

Om du ändrar storlek på kluster kan tillståndsombalansering utlösas. Under ombalanseringshändelser kan mikrobatcherna ha högre svarstid när tillståndet läses in från molnlagring till de nya körarna.

Ange inledande tillstånd för mapGroupsWithState

Du kan ange ett användardefinierat inledande tillstånd för tillståndskänslig bearbetning med structured streaming med eller flatMapGroupsWithStatemapGroupsWithState. På så sätt kan du undvika att bearbeta data när du startar en tillståndskänslig ström utan en giltig kontrollpunkt.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Exempel på användningsfall som anger ett initialt tillstånd för operatorn flatMapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Exempel på användningsfall som anger ett initialt tillstånd för operatorn mapGroupsWithState :

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Testa funktionen mapGroupsWithStateupdate

Med TestGroupState-API:et kan du testa tillståndet update funktion som används för Dataset.groupByKey(...).mapGroupsWithState(...) och Dataset.groupByKey(...).flatMapGroupsWithState(...).

Funktionen tillstånd update tar det tidigare tillståndet som indata med hjälp av ett objekt av typen GroupState. Se referensdokumentationen för Apache Spark GroupState. Till exempel:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}