Vad är tillståndskänslig streaming?
En tillståndskänslig förfrågan för strömmande strukturerad bearbetning kräver inkrementella uppdateringar av information om mellanliggande tillstånd, medan en tillståndslös förfrågan för strömmande strukturerad bearbetning endast spårar information om vilka rader som har bearbetats från källa till mottagare.
Tillståndskänsliga operationer omfattar strömmande aggregering, strömmande dropDuplicates
, ström-till-ström-sammanslagningar och anpassade tillståndskänsliga applikationer.
Informationen om mellanliggande tillstånd som krävs för tillståndskänsliga frågor för strukturerad direktuppspelning kan leda till oväntade svarstider och produktionsproblem om de är felkonfigurerade.
I Databricks Runtime 13.3 LTS och senare kan du aktivera kontrollpunkter för ändringsloggar med RocksDB för att sänka varaktigheten för kontrollpunkter och svarstid från slutpunkt till slutpunkt för strukturerade strömningsarbetsbelastningar. Databricks rekommenderar att du aktiverar kontrollpunkter för ändringslogg för alla tillståndsfrågor i Structured Streaming. Se Aktivera kontrollpunkter för ändringsloggar.
Optimera tillståndskänsliga frågor för strukturerad direktuppspelning
Genom att hantera mellanliggande tillståndsinformation för tillståndskänsliga frågor för strukturerad direktuppspelning kan du förhindra oväntade svarstider och produktionsproblem.
Databricks rekommenderar:
- Använd beräkningsoptimerade instanser som arbetare.
- Ange antalet shuffle-partitioner till 1–2 gånger antalet kärnor i klustret.
- Ange konfigurationen för
spark.sql.streaming.noDataMicroBatches.enabled
tillfalse
i SparkSession. Detta hindrar den strömmande mikro-batchmotorn från att bearbeta mikro-batcher som inte innehåller data. Observera också att om du ställer in den här konfigurationen påfalse
kan det leda till tillståndskänsliga åtgärder som använder vattenstämplar eller tidsgränser för bearbetning och därför inte ger datautdata förrän nya data tas emot, snarare än omedelbart.
Databricks rekommenderar att du använder RocksDB med kontrollpunkter för ändringsloggar för att hantera tillståndet för tillståndskänsliga strömmar. Se Konfigurera RocksDB tillståndslagring i Azure Databricks.
Kommentar
Det går inte att ändra hanteringsschemat för tillstånd mellan omstarter av sökningar. Om en fråga har startats med standardhanteringen måste du starta om den från grunden med en ny kontrollpunktsplats för att ändra tillståndsarkivet.
Arbeta med flera tillståndskänsliga operatorer i strukturerad direktuppspelning
I Databricks Runtime 13.3 LTS och senare erbjuder Azure Databricks avancerat stöd för tillståndskänsliga operatörer i strukturerade strömningsarbetsbelastningar. Nu kan du länka flera tillståndskänsliga operatorer tillsammans, vilket innebär att du kan mata utdata från en åtgärd, till exempel en fönsterad aggregering, till en annan tillståndskänslig åtgärd, till exempel en koppling.
I Databricks Runtime 16.2 och senare kan du använda transformWithState
i arbetsbelastningar med flera tillståndskänsliga operatorer. Se Skapa ett anpassat tillståndskänsligt program.
I följande exempel visas flera mönster som du kan använda.
Viktigt!
Följande begränsningar finns när du arbetar med flera tillståndskänsliga operatorer:
- Äldre anpassade tillståndskänsliga operatorer (
FlatMapGroupWithState
ochapplyInPandasWithState
stöds inte. - Endast tilläggsläge för utdata stöds.
Kedjad tidsfönsteraggregering
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Aggregering av tidsfönster i två olika strömmar följt av stream-stream-fönsteranslutning
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Ström-ström tidsintervallkoppling följt av tidsfönsteraggregering
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Ombalansering av tillstånd för strukturerad direktuppspelning
Tillståndsombalansering är aktiverat som standard för alla strömmande arbetsbelastningar i DLT. I Databricks Runtime 11.3 LTS och senare kan du ange följande konfigurationsalternativ i Spark-klusterkonfigurationen för att aktivera tillståndsombalansering:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Ombalansering av tillstånd gynnar tillståndskänsliga pipelines för strukturerad direktuppspelning som genomgår klusterändringshändelser. Tillståndslösa strömningsåtgärder gynnas inte, oavsett om klusterstorlekarna ändras.
Kommentar
Automatiserad skalning av beräkningsresurser har begränsningar för att minska klusterstorleken för arbetsbelastningar med strukturerad strömning. Databricks rekommenderar att du använder DLT med förbättrad automatisk skalning för strömningsarbetsbelastningar. Se Optimera klusteranvändningen av DLT-pipelines med förbättrad automatisk skalning.
Händelser som ändrar klusterstorlek utlöser ombalansering av tillstånd. Mikrobatcher kan ha högre latens under ombalanseringsprocesser när tillståndet läses in från molnlagring till de nya exekutorerna.