Build a custom stateful application
Important
This feature is in Public Preview in Databricks Runtime 16.2 and above.
You can build streaming applications using custom stateful operators to implement low-latency and near real-time solutions that use arbitrary stateful logic. Custom stateful operators unlock new operational use cases and patterns unavailable through traditional Structured Streaming processing.
Note
Databricks recommends using built-in Structured Streaming functionality for supported stateful operations such as aggregations, deduplication, and streaming joins. See What is stateful streaming?.
Databricks recommends using transformWithState
over legacy operators for arbitrary state transformations. For documentation on the legacy flatMapGroupsWithState
and mapGroupsWithState
operators, see Legacy arbitrary stateful operators.
Requirements
The transformWithState
operator and the related APIs and classes have the following requirements:
- Available in Databricks Runtime 16.2 and above.
- Compute must use dedicated or no-isolation access mode.
- You must use the RocksDB state store provider. Databricks recommends enabling RocksDB as part of the compute configuration.
Note
To enable the RocksDB state store provider for the current session, run the following:
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
What is transformWithState?
The transformWithState
operator applies a custom stateful processor to a Structured Streaming query. You must implement a custom stateful processor to use transformWithState
. Structured Streaming includes APIs for building your stateful processor using Python, Scala, or Java.
You use transformWithState
to apply custom logic to a grouping key for records processed incrementally with Structured Streaming. The following describes the high-level design:
- Define one or more state variables.
- State information is persisted for each grouping key and can be accessed for each state variable according to user-defined logic.
- For each micro batch processed, all records for the key are available as an iterator.
- Use built-in handles to control when and how records are emitted based on timers and user-defined conditions.
- State values support individual time-to-live (TTL) definitions, allowing for flexibility in managing state expiration and state size.
Important
PySpark uses the operator transformWithStateInPandas
instead of transformWithState
. Azure Databricks documentation uses transformWithState
to describe functionality for both Python and Scala implementations.
The Scala and Python implementations of transformWithState
and related APIs differ due to language specifics but provide the same functionality. Refer to language-specific examples and API documentation for your preferred programming language.
Built-in processing handles
You implement the core logic for your custom stateful application by implementing handlers using built-in handles.
- Handles provide the methods to interact with state values and timers, process incoming records, and emit records.
- Handlers define your custom event-driven logic.
Handles for each state type are implemented based on the underlying data structure, but each contains functionality to get, put, update, and delete records.
Handlers are implemented based on either events observed in input records or timers, using the following semantics:
- Define a handler using the
handleInputRows
method to control how data is processed, the state is updated, and records are emitted for each micro batch of records processed for the grouping key. See Handle input rows. - Define a handler using the
handleExpiredTimer
method to use time-based thresholds to run logic whether or not additional records are processed for the grouping key. See Program timed events.
The following table has a comparison of functional behaviors supported by these handlers:
Behavior | handleInputRows |
handleExpiredTimer |
---|---|---|
Get, put, update, or clear state values | Yes | Yes |
Create or delete a timer | Yes | Yes |
Emit records | Yes | Yes |
Iterate over records in the current micro batch | Yes | No |
Trigger logic based on elapsed time | No | Yes |
You can combine handleInputRows
and handleExpiredTimer
to implement complex logic as needed.
For example, you could implement an application that uses handleInputRows
to update state values for each micro-batch and set a timer of 10 seconds in the future. If no additional records are processed, you can use handleExpiredTimer
to emit the current values in the state store. If new records are processed for the grouping key, you can clear the existing timer and set a new timer.
Custom state types
You can implement multiple state objects in a single stateful operator. The names you give to each state object persist in the state store, which you can access with the state store reader. If your state object uses a StructType
, you provide names for each field in the struct while passing the schema. These names are also visible when reading the state store. See Read Structured Streaming state information.
The functionality provided by built-in classes and operators is meant to provide flexibility and extensibility, and implementation choices should be informed by the complete logic your application needs to run. For example, you might implement nearly identical logic using a ValueState
grouped by fields user_id
and session_id
or a MapState
grouped by user_id
where session_id
is the key for the MapState
. In this instance, a MapState
might be the preferred implementation if logic needs to evaluate conditions across multiple session_id
s.
The following sections describe the state types supported by transformWithState
.
ValueState
For each grouping key, there is an associated value.
A value state can include complex types, such as a struct or tuple. When you update a ValueState
, you implement logic to replace the entire value. The TTL for a value state resets when the value updates but is not reset if a source key matching a ValueState
is processed without updating the stored ValueState
.
ListState
For each grouping key, there is an associated list.
A list state is a collection of values, each of which can include complex types. Each value in a list has its own TTL. You can add items to a list by appending individual items, appending a list of items, or overwriting the entire list with a put
. Only the put operation is considered an update for resetting TTL.
MapState
For each grouping key, there is an associated map. Maps are the Apache Spark functional equivalent to a Python dict.
Important
Grouping keys describe the fields specified in the GROUP BY
clause of Structured Streaming query. Map states contain an arbitrary number of key-value pairs for a grouping key.
For example, if you group by user_id
and want to define a map for each session_id
, your grouping key is user_id
and the key in your map is session_id
.
A map state is a collection of distinct keys that each map to a value that can include complex types. Each key-value pair in a map has its own TTL. You can update the value of a specific key or remove a key and its value. You can return an individual value using its key, list all keys, list all values, or return an iterator to work with the complete set of key-value pairs in the map.
Initialize a custom state variable
When you initialize your StatefulProcessor
, you create a local variable for each state object that allows you to interact with state objects in your custom logic. State variables are defined and initialized by overriding the built-in init
method in the StatefulProcessor
class.
You define an arbitrary amount of state objects using the getValueState
, getListState
, and getMapState
methods while initializing your StatefulProcessor
.
Each state object must have the following:
- A unique name
- A schema specified
- In Python, the schema is specified explicitly.
- In Scala, pass an
Encoder
to specify state schema.
You can also provide an optional time-to-live (TTL) duration in milliseconds. If implementing a map state, you must provide a separate schema definition for the map keys and the values.
Note
Logic for how state information is queried, updated, and emitted is handled separately. See Use your state variables.
Example stateful application
The following demonstrates the basic syntax for defining and using a custom stateful processor with transformWithState
, including example state variables for each supported type. For more examples, see Example stateful applications.
Note
Python uses tuples for all interactions with state values. This means Python code should pass values using tuples when using operations such as put
and update
and expect to handle tuples when using get
.
For example, if the schema for your value state is just a single integer, you would implement code like the following:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
This is also true for items in a ListState
or values in a MapState
.
Python
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter1)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
Encoders.scalaLong, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
Encoders.scalaInt, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
StatefulProcessorHandle
PySpark includes the StatefulProcessorHandle
class to provide access to functions that control how your user-defined Python code interacts with state information. You must always import and pass the StatefulProcessorHandle
to the handle
variable when initializing a StatefulProcessor
.
The handle
variable ties the local variable in your Python class to the state variable.
Note
Scala uses the getHandle
method.
Specify initial state
You can optionally provide an initial state to use with the first micro-batch. This can be useful when migrating an existing workflow to a new custom application, upgrading a stateful operator to change your schema or logic, or repairing a failure that cannot be automatically repaired and requires manual intervention.
Note
Use the state store reader to query state information from an existing checkpoint. See Read Structured Streaming state information.
If you are converting an existing Delta table to a stateful application, read the table using spark.read.table("table_name")
and pass the resulting DataFrame. You can optionally select or modify fields to conform to your new stateful application.
You provide an initial state using a DataFrame with the same grouping key schema as the input rows.
Note
Python uses handleInitialState
to specify the initial state while defining a StatefulProcessor
. Scala uses the distinct class StatefulProcessorWithInitialState
.
Use your state variables
Supported state objects provide methods for getting state, updating existing state information, or clearing the current state. Each supported state type has a unique implementation of methods that correspond to the data structure implemented.
Each grouping key observed has dedicated state information.
- Records are emitted based on the logic you implement and using the output schema you specify. See Emit records.
- You can access values in the state store using the
statestore
reader. This reader has batch functionality and is not intended for low-latency workloads. See Read Structured Streaming state information. - Logic specified using
handleInputRows
only fires if records for the key are present in a micro-batch. See Handle input rows. - Use
handleExpiredTimer
to implement time-based logic that doesn’t depend on observing records to fire. See Program timed events.
Note
State objects are isolated by grouping keys with the following implications:
- State values cannot be impacted by records associated with a different grouping key.
- You cannot implement logic that depends on comparing values or updating state across grouping keys.
You can compare values within a grouping key. Use a MapState
to implement logic with a second key that your custom logic can use. For example, grouping by user_id
and keying your MapState
using IP address would allow you to implement logic that tracks simultaneous user sessions.
Advanced considerations for working with state
Writing to a state variable triggers a write to RocksDB. For optimized performance, Databricks recommends processing all values in the iterator for a given key and committing updates in a single write whenever possible.
Note
State updates are fault-tolerant. If a task crashes before a micro-batch has finished processing, the value from the last successful micro-batch is used on retry.
State values do not have any built-in defaults. If your logic requires reading existing state information, use the exists
method while implementing your logic.
Note
MapState
variables have additional functionality to check for individual keys or list all keys to implement logic for null state.
Emit records
User-defined logic controls how transformWithState
emits records. Records are emitted per grouping key.
Custom stateful applications make no assumptions about how state information is used when determining how to emit records, and the returned number of records for a given condition can be none, one, or many.
You implement logic to emit records using either handleInputRows
or handleExpiredTimer
. See Handle input rows and Program timed events.
Note
You can implement multiple state values and define multiple conditions for emitting records, but all records emitted should use the same schema.
Python
In Python, you define your output schema using the outputStructType
keyword while calling transformWithStateInPandas
.
You emit records using a pandas DataFrame object and yield
.
You can optionally yield
an empty DataFrame. When combined with update
output mode, emitting an empty DataFrame updates the values for the grouping key to be null.
Scala
In Scala, you emit records using an Iterator
object. The schema of the output is derived from emitted records.
You can optionally emit an empty Iterator
. When combined with update
output mode, emitting an empty Iterator
updates the values for the grouping key to be null.
Handle input rows
Use the handleInputRows
method to define the logic for how records observed in your streaming query interact with and update state values. The handler you define with the handleInputRows
method runs each time any records are processed through your Structured Streaming query.
For most stateful applications implemented with transformWithState
, the core logic is defined using handleInputRows
.
For each micro-batch update processed, all records in the micro-batch for a given grouping key are available using an iterator. User-defined logic can interact with all records from the current microbatch and values in the statestore.
Program timed events
You can use timers to implement custom logic based on elapsed time from a specified condition.
You work with timers by implementing a handleExpiredTimer
method.
Within a grouping key, timers are uniquely identified by their timestamp.
When a timer expires, the result is determined by the logic implemented in your application. Common patterns include:
- Emitting information stored in a state variable.
- Evicting stored state information.
- Creating a new timer.
Expired timers fire even if no records for their associated key are processed in a micro-batch.
Specify the time model
When passing your StatefulProcessor
to transformWithState
, you must specify the time model. The following options are supported:
ProcessingTime
EventTime
NoTime
orTimeMode.None()
Specifying NoTime
means that timers are not supported for your processor.
Built-in timer values
Databricks recommends against invoking the system clock in your custom stateful application, as this can lead to unreliable retries on task failure. Use the methods in the TimerValues
class when you must access the processing time or watermark:
TimerValues |
Description |
---|---|
getCurrentProcessingTimeInMs |
Returns the timestamp of the processing time for the current batch in milliseconds since epoch. |
getCurrentWatermarkInMs |
Returns the timestamp of the watermark for the current batch in milliseconds since epoch. |
Note
Processing time describes the time that the micro-batch is processed by Apache Spark. Many streaming sources, such as Kafka, also include system processing time.
Watermarks on streaming queries are often defined against event time or the processing time of the streaming source. See Apply watermarks to control data processing thresholds.
Both watermarks and windows can be used in combination with transformWithState
. You might implement similar functionality in your custom stateful application by leveraging TTL, timers, and MapState
or ListState
functionality.
What is state time to live (TTL)?
The state values used by transformWithState
each support an optional time to live (TTL) specification. When TTL expires, the value is evicted from the state store. TTL only interacts with values in the state store, meaning you can implement logic to evict state information, but you cannot directly trigger logic as TTL evicts state values.
Important
If you do not implement TTL, you must handle state eviction using other logic to avoid endless state growth.
TTL is enforced for each state value, with different rules for each state type.
- State variables are scoped to grouping keys.
- For
ValueState
objects, only a single value is stored per grouping key. TTL applies to this value. - For
ListState
objects, the list can contain many values. TTL applies to each value in a list independently. - For
MapState
objects, each map key has an associated state value. TTL applies independently to each key-value pair in a map.
For all state types, TTL resets if the state information is updated.
Note
While TTL is scoped to individual values in a ListState
, the only way to update a value in a list is to use the put
method to overwrite the entire contents of the ListState
variable.
What is the difference between timers and TTL?
There is some overlap between timers and time to live (TTL) for state variables, but timers provide a broader set of features than TTL.
TTL evicts state information that has not been updated for the period specified by the user. This allows users to prevent unchecked state growth and remove stale state entries. Because maps and lists implement TTL for each value, you can implement functions that only consider state values that have been updated recently by setting TTL.
Timers allow you to define custom logic beyond state eviction, including emitting records. You can optionally use timers to clear state information for a given state value, with the additional flexibility to emit values or trigger other conditional logic based on the timer.