Freigeben über


Was ist zustandsbehaftetes Streaming?

Eine zustandsbehaftete strukturierte Streaming-Abfrage erfordert inkrementelle Aktualisierungen von Zwischenzustandsinformationen, während eine zustandslose strukturierte Streaming-Abfrage nur Informationen darüber verfolgt, welche Datensätze von der Datenquelle zur Senke verarbeitet wurden.

Zustandsbehaftete Vorgänge umfassen Streaming-Aggregation, Streaming dropDuplicates, Stream-Stream-Joins, mapGroupsWithState und flatMapGroupsWithState.

Die für zustandsbehaftete strukturierte Streaming-Abfragen erforderlichen Zwischenstatusinformationen können zu unerwarteten Wartezeit- und Produktionsproblemen führen, wenn sie nicht ordnungsgemäß konfiguriert sind.

In Databricks Runtime 13.3 LTS und höher können Sie mit RocksDB Prüfpunkte im Änderungsprotokoll aktivieren, um die Prüfpunktdauer und End-to-End-Latenz für strukturierte Streamingworkloads zu verringern. Databricks empfiehlt, Änderungsprotokollprüfpunkte für alle zustandsbehafteten Abfragen von strukturiertem Streaming zu aktivieren. Weitere Informationen unter Aktivieren der Änderungsprotokollprüfpunkte.

Optimieren zustandsbehafteter strukturierter Streaming-Abfragen

Das Verwalten der Zwischenstatusinformationen von zustandsbehafteten strukturierten Streaming-Abfragen kann dazu beitragen, unerwartete Latenz- und Produktionsprobleme zu verhindern.

Databricks empfiehlt:

  • Verwenden Sie computeoptimierte Instanzen als Worker,
  • Legen Sie die Anzahl der Shufflepartitionen auf das ein- bis zweifache der Anzahl von Kernen im Cluster fest.
  • Legen Sie die Konfiguration von spark.sql.streaming.noDataMicroBatches.enabled in der SparkSession-Instanz auf false fest. Dadurch wird verhindert, dass die Streaming-Microbatch-Engine Microbatches verarbeitet, die keine Daten enthalten. Beachten Sie auch, dass das Festlegen dieser Konfiguration auf false zu zustandsbehafteten Vorgängen führen kann, die Wasserzeichen oder Verarbeitungstimeouts nutzen, um eine Datenausgabe erst dann zu erhalten, wenn neue Daten eintreffen, und nicht sofort.

Databricks empfiehlt die Verwendung von RocksDB mit Changelog-Prüfpunkten, um den Zustand für zustandsbehaftete Datenströme zu verwalten. Weitere Informationen finden Sie unter Konfigurieren des RocksDB-Statusspeichers auf Azure Databricks.

Hinweis

Das Zustandsverwaltungsschema kann zwischen Abfrageneustarts nicht geändert werden. Das heißt, wenn eine Abfrage mit der Standardverwaltung gestartet wurde, kann diese nicht geändert werden, ohne die Abfrage von Grund auf mit einem neuen Prüfpunktspeicherort zu starten.

Arbeiten mit mehreren zustandsbehafteten Operatoren im strukturierten Streaming

In Databricks Runtime 13.3 LTS und höher bietet Azure Databricks erweiterte Unterstützung für zustandsbehaftete Operatoren in Workloads für strukturiertes Streaming. Sie können jetzt mehrere zustandsbehaftete Operatoren miteinander verketten, was bedeutet, dass Sie die Ausgabe eines Vorgangs, z. B. einer Fensteraggregation, an einen anderen zustandsbehafteten Vorgang wie z. B. eine Verknüpfung übertragen können.

Die folgenden Beispiele veranschaulichen mehrere Muster, die Sie verwenden können.

Wichtig

Beim Arbeiten mit mehreren zustandsbehafteten Operatoren bestehen die folgenden Einschränkungen:

  • FlatMapGroupWithState wird nicht unterstützt.
  • Nur der Modus „Ausgabe anfügen“ wird unterstützt.

Aggregation verketteter Zeitfenster

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Aggregation von Zeitfenstern in zwei verschiedenen Streams, gefolgt von Stream-Stream-Fenster-Verknüpfung

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Stream-Stream-Zeitintervallverknüpfung gefolgt von Zeitfensteraggregation

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Statusausgleich für strukturiertes Streaming

Der Statusausgleich ist standardmäßig für alle Streamingworkloads in Delta Live Tables aktiviert. In Databricks Runtime 11.3 LTS und höher können Sie die folgende Konfigurationsoption in der Spark-Clusterkonfiguration festlegen, um das erneute Ausgleichen des Zustands zu aktivieren:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Von der Zustandsneuordnung profitieren Pipelines für strukturiertes Streaming mit Clustergrößenänderungen. Zustandslose Streamingvorgänge profitieren nicht davon, auch wenn sich die Größe des Clusters ändert.

Hinweis

Die automatische Computeskalierung hat Einschränkungen beim Herunterskalieren der Clustergröße für strukturierten Streaming-Workloads. Databricks empfiehlt die Verwendung von Delta Live-Tabellen mit erweiterter automatischer Skalierung für Streaming-Workloads. Weitere Informationen finden Sie unter Optimieren der Clusternutzung von Delta Live Tables-Pipelines mit verbesserter automatischer Skalierung.

Ereignisse zur Größenänderung des Clusters lösen eine Zustandsneuordnung aus. Microbatches können bei Ereignissen der Neuordnung eine höhere Wartezeit aufweisen, da der Zustand aus dem Cloudspeicher in die neuen Executors geladen wird.

Angeben des Anfangszustands für mapGroupsWithState

Sie können einen benutzerdefinierten Anfangszustand für die zustandsabhängige Verarbeitung von strukturiertem Streaming mit flatMapGroupsWithState oder mapGroupsWithState angeben. Auf diese Weise können Sie die erneute Verarbeitung von Daten vermeiden, wenn Sie einen zustandsabhängigen Stream ohne gültigen Prüfpunkt starten.

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]

def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

Beispiel für einen Anwendungsfall, bei dem ein Anfangszustand für den Operator flatMapGroupsWithState angegeben wird:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Beispiel für einen Anwendungsfall, bei dem ein Anfangszustand für den Operator mapGroupsWithState angegeben wird:

val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  (key, count.toString)
}

val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Tester der Aktualisierungsfunktion mapGroupsWithState

Mit der TestGroupState-API können Sie die für Dataset.groupByKey(...).mapGroupsWithState(...) und Dataset.groupByKey(...).flatMapGroupsWithState(...) verwendete Zustandsaktualisierungsfunktion testen.

Die Zustandsaktualisierungsfunktion verwendet den vorherigen Zustand als Eingabe unter Verwendung eines Objekts vom Typ GroupState. Weitere Informationen finden Referenzdokumentation zu GroupState von Apache Spark. Beispiel:

import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
}