Co je stavové streamování?
Stavový dotaz strukturovaného streamování vyžaduje přírůstkové aktualizace průběžných informací o stavu, zatímco bezstavový dotaz strukturovaného streamování sleduje pouze informace o tom, které řádky byly zpracovány ze zdroje do jímky.
Stavové operace zahrnují agregaci streamování, streamování dropDuplicates
, spojení datových proudů mapGroupsWithState
a flatMapGroupsWithState
.
Informace o přechodném stavu vyžadované pro stavové dotazy strukturovaného streamování můžou vést k neočekávaným problémům s latencí a produkčním prostředím, pokud nejsou správně nakonfigurované.
Ve službě Databricks Runtime 13.3 LTS a novějších můžete povolit kontrolní body protokolu změn pomocí RocksDB a snížit dobu trvání kontrolního bodu a kompletní latenci pro úlohy strukturovaného streamování. Databricks doporučuje zapnout kontrolní body protokolu změn pro všechny stavové dotazy strukturovaného streamování. Viz Povolení vytváření kontrolních bodů protokolu změn.
Optimalizace stavových dotazů strukturovaného streamování
Správa průběžných informací o stavových dotazech strukturovaného streamování může pomoct zabránit neočekávaným problémům s latencí a produkčním prostředím.
Databricks doporučuje:
- Používejte výpočetní instance jako pracovní procesy.
- Nastavte počet oddílů náhodného prohazování na 1–2krát počet jader v clusteru.
- Nastavte v SparkSession konfiguraci
spark.sql.streaming.noDataMicroBatches.enabled
nafalse
. Tím zabráníte streamovacímu mikrodávkovému modulu zpracovávat mikrodávkové dávky, které neobsahují data. Všimněte si také, že nastavení této konfigurace nafalse
může vést ke stavovým operacím, které využívají meze nebo časové limity zpracování, aby nezískuly výstup dat, dokud se nová data nedostanou místo okamžitého doručení.
Databricks doporučuje používat RocksDB s kontrolním bodem protokolu změn ke správě stavu stavových datových proudů. Viz Konfigurace úložiště stavů RocksDB v Azure Databricks.
Poznámka:
Schéma správy stavu nelze mezi restartováními dotazů změnit. To znamená, že pokud byl dotaz spuštěn s výchozí správou, nemůže se změnit bez spuštění dotazu od začátku s novým umístěním kontrolního bodu.
Práce s několika stavovými operátory ve strukturovaném streamování
V Databricks Runtime 13.3 LTS a novějších nabízí Azure Databricks pokročilou podporu stavových operátorů v úlohách strukturovaného streamování. Nyní můžete zřetězovat několik stavových operátorů, což znamená, že můžete předávat výstup operace, jako je okenní agregace, do jiné stavové operace, jako je sloučení.
Následující příklady ukazují několik vzorů, které můžete použít.
Důležité
Při práci s několika stavovými operátory existují následující omezení:
-
FlatMapGroupWithState
není podporováno. - Podporuje se pouze režim výstupu připojení.
Agregace zřetězených časových intervalů
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agregace časového okna ve dvou různých datových proudech následovaná spojením časového okna datových proudů.
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Spojení datových proudů podle časového intervalu následované agregací v časovém okně
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Vyrovnávání stavu pro strukturované streamování
Vyrovnávání stavu je ve výchozím nastavení povoleno pro všechny streamingové úlohy v Delta Live Tables. Ve službě Databricks Runtime 11.3 LTS a novějších můžete v konfiguraci clusteru Spark nastavit následující možnost konfigurace, abyste povolili vyrovnávání stavu:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Vyrovnávání stavu přináší stavové kanály strukturovaného streamování, které procházejí událostmi změny velikosti clusteru. Bezstavové operace streamování nemají prospěch bez ohledu na změnu velikosti clusteru.
Poznámka:
Automatické škálování výpočetních prostředků má omezení vertikálního snížení kapacity clusteru pro úlohy strukturovaného streamování. Databricks doporučuje používat tabulky Delta Live s vylepšeným automatickým škálováním pro úlohy streamování. Viz Optimalizace využití clusteru kanálů Delta Live Tables s vylepšeným automatickým škálováním.
Změna velikosti událostí clusteru způsobí opětovné vyrovnávání stavu, který se aktivuje. Při rebalancování událostí můžou mikrodávkové dávky mít vyšší latenci, protože se stav načítá z cloudového úložiště do nových exekutorů.
Zadání počátečního stavu pro mapGroupsWithState
Můžete zadat uživatelem definovaný počáteční stav pro stavové zpracování strukturovaného streamování pomocí flatMapGroupsWithState
nebo mapGroupsWithState
. To vám umožní vyhnout se opětovnému zpracování dat při spuštění stavového datového proudu bez platného kontrolního bodu.
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
Příklad případu použití, který určuje počáteční stav operátoru flatMapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Příklad případu použití, který určuje počáteční stav operátoru mapGroupsWithState
:
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
Testovat funkci mapGroupsWithState
update
Rozhraní API TestGroupState
umožňuje otestovat funkci aktualizace stavu používanou pro Dataset.groupByKey(...).mapGroupsWithState(...)
a Dataset.groupByKey(...).flatMapGroupsWithState(...)
.
Funkce aktualizace stavu přebírá předchozí stav jako vstup pomocí objektu typu GroupState
. Viz referenční dokumentace k Apache Spark GroupState. Příklad:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}