What is stateful streaming?
A stateful Structured Streaming query requires incremental updates to intermediate state information, whereas a stateless Structured Streaming query only tracks information about which rows have been processed from the source to the sink.
Stateful operations include streaming aggregation, streaming dropDuplicates
, stream-stream joins, and custom stateful applications.
The intermediate state information required for stateful Structured Streaming queries can lead to unexpected latency and production problems if misconfigured.
In Databricks Runtime 13.3 LTS and above, you can enable changelog checkpointing with RocksDB to lower checkpoint duration and end-to-end latency for Structured Streaming workloads. Databricks recommends enabling changelog checkpointing for all Structured Streaming stateful queries. See Enable changelog checkpointing.
Optimize stateful Structured Streaming queries
Managing the intermediate state information of stateful Structured Streaming queries can help prevent unexpected latency and production problems.
Databricks recommends:
- Use compute-optimized instances as workers.
- Set the number of shuffle partitions to 1-2 times the number of cores in the cluster.
- Set the
spark.sql.streaming.noDataMicroBatches.enabled
configuration tofalse
in the SparkSession. This prevents the streaming micro-batch engine from processing micro-batches that do not contain data. Note also that setting this configuration tofalse
could result in stateful operations that use watermarks or processing time timeouts not to get data output until new data arrives instead of immediately.
Databricks recommends using RocksDB with changelog checkpointing to manage the state for stateful streams. See Configure RocksDB state store on Azure Databricks.
Note
The state management scheme cannot be changed between query restarts. If a query has been started with the default management, you must restart it from scratch with a new checkpoint location to change the state store.
Work with multiple stateful operators in Structured Streaming
In Databricks Runtime 13.3 LTS and above, Azure Databricks offers advanced support for stateful operators in Structured Streaming workloads. You can now chain multiple stateful operators together, meaning you can feed the output of an operation, such as a windowed aggregation, to another stateful operation, such as a join.
In Databricks Runtime 16.2 and above, you can use transformWithState
in workloads with multiple stateful operators. See Build a custom stateful application.
The following examples demonstrate several patterns you can use.
Important
The following limitations exist when working with multiple stateful operators:
- Legacy custom stateful operators (
FlatMapGroupWithState
andapplyInPandasWithState
are not supported. - Only the append output mode is supported.
Chained time window aggregation
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Time window aggregation in two different streams followed by stream-stream window join
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Stream-stream time interval join followed by time window aggregation
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
State rebalancing for Structured Streaming
State rebalancing is enabled by default for all streaming workloads in Delta Live Tables. In Databricks Runtime 11.3 LTS and above, you can set the following configuration option in the Spark cluster configuration to enable state rebalancing:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
State rebalancing benefits stateful Structured Streaming pipelines that undergo cluster resizing events. Stateless streaming operations do not benefit, regardless of changing cluster sizes.
Note
Compute auto-scaling has limitations scaling down cluster size for Structured Streaming workloads. Databricks recommends using Delta Live Tables with enhanced autoscaling for streaming workloads. See Optimize the cluster utilization of Delta Live Tables pipelines with enhanced autoscaling.
Cluster resizing events trigger state rebalancing. Micro-batches might have higher latency during rebalancing events as the state loads from cloud storage to the new executors.