Build a custom stateful application

Important

This feature is in Public Preview in Databricks Runtime 16.2 and above.

You can build streaming applications using custom stateful operators to implement low-latency and near real-time solutions that use arbitrary stateful logic. Custom stateful operators unlock new operational use cases and patterns unavailable through traditional Structured Streaming processing.

Note

Databricks recommends using built-in Structured Streaming functionality for supported stateful operations such as aggregations, deduplication, and streaming joins. See What is stateful streaming?.

Databricks recommends using transformWithState over legacy operators for arbitrary state transformations. For documentation on the legacy flatMapGroupsWithState and mapGroupsWithState operators, see Legacy arbitrary stateful operators.

Requirements

The transformWithState operator and the related APIs and classes have the following requirements:

  • Available in Databricks Runtime 16.2 and above.
  • Compute must use dedicated or no-isolation access mode.
  • You must use the RocksDB state store provider. Databricks recommends enabling RocksDB as part of the compute configuration.

Note

To enable the RocksDB state store provider for the current session, run the following:

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

What is transformWithState?

The transformWithState operator applies a custom stateful processor to a Structured Streaming query. You must implement a custom stateful processor to use transformWithState. Structured Streaming includes APIs for building your stateful processor using Python, Scala, or Java.

You use transformWithState to apply custom logic to a grouping key for records processed incrementally with Structured Streaming. The following describes the high-level design:

  • Define one or more state variables.
  • State information is persisted for each grouping key and can be accessed for each state variable according to user-defined logic.
  • For each micro batch processed, all records for the key are available as an iterator.
  • Use built-in handles to control when and how records are emitted based on timers and user-defined conditions.
  • State values support individual time-to-live (TTL) definitions, allowing for flexibility in managing state expiration and state size.

Important

PySpark uses the operator transformWithStateInPandas instead of transformWithState. Azure Databricks documentation uses transformWithState to describe functionality for both Python and Scala implementations.

The Scala and Python implementations of transformWithState and related APIs differ due to language specifics but provide the same functionality. Refer to language-specific examples and API documentation for your preferred programming language.

Built-in processing handles

You implement the core logic for your custom stateful application by implementing handlers using built-in handles.

  • Handles provide the methods to interact with state values and timers, process incoming records, and emit records.
  • Handlers define your custom event-driven logic.

Handles for each state type are implemented based on the underlying data structure, but each contains functionality to get, put, update, and delete records.

Handlers are implemented based on either events observed in input records or timers, using the following semantics:

  • Define a handler using the handleInputRows method to control how data is processed, the state is updated, and records are emitted for each micro batch of records processed for the grouping key. See Handle input rows.
  • Define a handler using the handleExpiredTimer method to use time-based thresholds to run logic whether or not additional records are processed for the grouping key. See Program timed events.

The following table has a comparison of functional behaviors supported by these handlers:

Behavior handleInputRows handleExpiredTimer
Get, put, update, or clear state values Yes Yes
Create or delete a timer Yes Yes
Emit records Yes Yes
Iterate over records in the current micro batch Yes No
Trigger logic based on elapsed time No Yes

You can combine handleInputRows and handleExpiredTimer to implement complex logic as needed.

For example, you could implement an application that uses handleInputRows to update state values for each micro-batch and set a timer of 10 seconds in the future. If no additional records are processed, you can use handleExpiredTimer to emit the current values in the state store. If new records are processed for the grouping key, you can clear the existing timer and set a new timer.

Custom state types

You can implement multiple state objects in a single stateful operator. The names you give to each state object persist in the state store, which you can access with the state store reader. If your state object uses a StructType, you provide names for each field in the struct while passing the schema. These names are also visible when reading the state store. See Read Structured Streaming state information.

The functionality provided by built-in classes and operators is meant to provide flexibility and extensibility, and implementation choices should be informed by the complete logic your application needs to run. For example, you might implement nearly identical logic using a ValueState grouped by fields user_id and session_id or a MapState grouped by user_id where session_id is the key for the MapState. In this instance, a MapState might be the preferred implementation if logic needs to evaluate conditions across multiple session_ids.

The following sections describe the state types supported by transformWithState.

ValueState

For each grouping key, there is an associated value.

A value state can include complex types, such as a struct or tuple. When you update a ValueState, you implement logic to replace the entire value. The TTL for a value state resets when the value updates but is not reset if a source key matching a ValueState is processed without updating the stored ValueState.

ListState

For each grouping key, there is an associated list.

A list state is a collection of values, each of which can include complex types. Each value in a list has its own TTL. You can add items to a list by appending individual items, appending a list of items, or overwriting the entire list with a put. Only the put operation is considered an update for resetting TTL.

MapState

For each grouping key, there is an associated map. Maps are the Apache Spark functional equivalent to a Python dict.

Important

Grouping keys describe the fields specified in the GROUP BY clause of Structured Streaming query. Map states contain an arbitrary number of key-value pairs for a grouping key.

For example, if you group by user_id and want to define a map for each session_id, your grouping key is user_id and the key in your map is session_id.

A map state is a collection of distinct keys that each map to a value that can include complex types. Each key-value pair in a map has its own TTL. You can update the value of a specific key or remove a key and its value. You can return an individual value using its key, list all keys, list all values, or return an iterator to work with the complete set of key-value pairs in the map.

Initialize a custom state variable

When you initialize your StatefulProcessor, you create a local variable for each state object that allows you to interact with state objects in your custom logic. State variables are defined and initialized by overriding the built-in init method in the StatefulProcessor class.

You define an arbitrary amount of state objects using the getValueState, getListState, and getMapState methods while initializing your StatefulProcessor.

Each state object must have the following:

  • A unique name
  • A schema specified
    • In Python, the schema is specified explicitly.
    • In Scala, pass an Encoder to specify state schema.

You can also provide an optional time-to-live (TTL) duration in milliseconds. If implementing a map state, you must provide a separate schema definition for the map keys and the values.

Note

Logic for how state information is queried, updated, and emitted is handled separately. See Use your state variables.

Example stateful application

The following demonstrates the basic syntax for defining and using a custom stateful processor with transformWithState, including example state variables for each supported type. For more examples, see Example stateful applications.

Note

Python uses tuples for all interactions with state values. This means Python code should pass values using tuples when using operations such as put and update and expect to handle tuples when using get.

For example, if the schema for your value state is just a single integer, you would implement code like the following:

current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0]  # Extracts the first item in the tuple
new_value = current_value + 1           # Calculate a new value
value_state.update((new_value,))        # Pass the new value formatted as a tuple

This is also true for items in a ListState or values in a MapState.

Python

import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

output_schema = StructType(
    [
        StructField("id", StringType(), True),
        StructField("countAsString", StringType(), True),
    ]
)

class SimpleCounterProcessor(StatefulProcessor):
  def init(self, handle: StatefulProcessorHandle) -> None:
    value_state_schema = StructType([StructField("count", IntegerType(), True)])
    list_state_schema = StructType([StructField("count", IntegerType(), True)])
    self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
    self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
    # Schema can also be defined using strings and SQL DDL syntax
    self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")

  def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
    count = 0
    for pdf in rows:
      list_state_rows = [(120,), (20,)] # A list of tuples
      self.list_state.put(list_state_rows)
      self.list_state.appendValue((111,))
      self.list_state.appendList(list_state_rows)
      pdf_count = pdf.count()
      count += pdf_count.get("value")
    self.value_state.update((count,)) # Count is passed as a tuple
    iter = self.list_state.get()
    list_state_value = next(iter1)[0]
    value = count
    user_key = ("user_key",)
    if self.map_state.exists():
      if self.map_state.containsKey(user_key):
        value += self.map_state.getValue(user_key)[0]
    self.map_state.updateValue(user_key, (value,)) # Value is a tuple
    yield pd.DataFrame({"id": key, "countAsString": str(count)})

q = (df.groupBy("key")
  .transformWithStateInPandas(
    statefulProcessor=SimpleCounterProcessor(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",
  )
  .writeStream...
)

Scala

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
  @transient private var countState: ValueState[Int] = _
  @transient private var listState: ListState[Int] = _
  @transient private var mapState: MapState[String, Int] = _

  override def init(
      outputMode: OutputMode,
      timeMode: TimeMode): Unit = {
    countState = getHandle.getValueState[Int]("countState",
      Encoders.scalaLong, TTLConfig.NONE)
    listState = getHandle.getListState[Int]("listState",
      Encoders.scalaInt, TTLConfig.NONE)
    mapState = getHandle.getMapState[String, Int]("mapState",
      Encoders.STRING, Encoders.scalaInt, TTLConfig.NONE)
  }

  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, String)],
      timerValues: TimerValues): Iterator[(String, String)] = {
    var count = countState.getOption().getOrElse(0)
    for (row <- inputRows) {
      val listData = Array(120, 20)
      listState.put(listData)
      listState.appendValue(count)
      listState.appendList(listData)
      count += 1
    }
    val iter = listState.get()
    var listStateValue = 0
    if (iter.hasNext) {
      listStateValue = iter.next()
    }
    countState.update(count)
    var value = count
    val userKey = "userKey"
    if (mapState.exists()) {
      if (mapState.containsKey(userKey)) {
        value += mapState.getValue(userKey)
      }
    }
    mapState.updateValue(userKey, value)
    Iterator((key, count.toString))
  }
}

val q = spark
        .readStream
        .format("delta")
        .load("$srcDeltaTableDir")
        .as[(String, String)]
        .groupByKey(x => x._1)
        .transformWithState(
            new SimpleCounterProcessor(),
            TimeMode.None(),
            OutputMode.Update(),
        )
        .writeStream...

StatefulProcessorHandle

PySpark includes the StatefulProcessorHandle class to provide access to functions that control how your user-defined Python code interacts with state information. You must always import and pass the StatefulProcessorHandle to the handle variable when initializing a StatefulProcessor.

The handle variable ties the local variable in your Python class to the state variable.

Note

Scala uses the getHandle method.

Specify initial state

You can optionally provide an initial state to use with the first micro-batch. This can be useful when migrating an existing workflow to a new custom application, upgrading a stateful operator to change your schema or logic, or repairing a failure that cannot be automatically repaired and requires manual intervention.

Note

Use the state store reader to query state information from an existing checkpoint. See Read Structured Streaming state information.

If you are converting an existing Delta table to a stateful application, read the table using spark.read.table("table_name") and pass the resulting DataFrame. You can optionally select or modify fields to conform to your new stateful application.

You provide an initial state using a DataFrame with the same grouping key schema as the input rows.

Note

Python uses handleInitialState to specify the initial state while defining a StatefulProcessor. Scala uses the distinct class StatefulProcessorWithInitialState.

Use your state variables

Supported state objects provide methods for getting state, updating existing state information, or clearing the current state. Each supported state type has a unique implementation of methods that correspond to the data structure implemented.

Each grouping key observed has dedicated state information.

Note

State objects are isolated by grouping keys with the following implications:

  • State values cannot be impacted by records associated with a different grouping key.
  • You cannot implement logic that depends on comparing values or updating state across grouping keys.

You can compare values within a grouping key. Use a MapState to implement logic with a second key that your custom logic can use. For example, grouping by user_id and keying your MapState using IP address would allow you to implement logic that tracks simultaneous user sessions.

Advanced considerations for working with state

Writing to a state variable triggers a write to RocksDB. For optimized performance, Databricks recommends processing all values in the iterator for a given key and committing updates in a single write whenever possible.

Note

State updates are fault-tolerant. If a task crashes before a micro-batch has finished processing, the value from the last successful micro-batch is used on retry.

State values do not have any built-in defaults. If your logic requires reading existing state information, use the exists method while implementing your logic.

Note

MapState variables have additional functionality to check for individual keys or list all keys to implement logic for null state.

Emit records

User-defined logic controls how transformWithState emits records. Records are emitted per grouping key.

Custom stateful applications make no assumptions about how state information is used when determining how to emit records, and the returned number of records for a given condition can be none, one, or many.

You implement logic to emit records using either handleInputRows or handleExpiredTimer. See Handle input rows and Program timed events.

Note

You can implement multiple state values and define multiple conditions for emitting records, but all records emitted should use the same schema.

Python

In Python, you define your output schema using the outputStructType keyword while calling transformWithStateInPandas.

You emit records using a pandas DataFrame object and yield.

You can optionally yield an empty DataFrame. When combined with update output mode, emitting an empty DataFrame updates the values for the grouping key to be null.

Scala

In Scala, you emit records using an Iterator object. The schema of the output is derived from emitted records.

You can optionally emit an empty Iterator. When combined with update output mode, emitting an empty Iterator updates the values for the grouping key to be null.

Handle input rows

Use the handleInputRows method to define the logic for how records observed in your streaming query interact with and update state values. The handler you define with the handleInputRows method runs each time any records are processed through your Structured Streaming query.

For most stateful applications implemented with transformWithState, the core logic is defined using handleInputRows.

For each micro-batch update processed, all records in the micro-batch for a given grouping key are available using an iterator. User-defined logic can interact with all records from the current microbatch and values in the statestore.

Program timed events

You can use timers to implement custom logic based on elapsed time from a specified condition.

You work with timers by implementing a handleExpiredTimer method.

Within a grouping key, timers are uniquely identified by their timestamp.

When a timer expires, the result is determined by the logic implemented in your application. Common patterns include:

  • Emitting information stored in a state variable.
  • Evicting stored state information.
  • Creating a new timer.

Expired timers fire even if no records for their associated key are processed in a micro-batch.

Specify the time model

When passing your StatefulProcessor to transformWithState, you must specify the time model. The following options are supported:

  • ProcessingTime
  • EventTime
  • NoTime or TimeMode.None()

Specifying NoTime means that timers are not supported for your processor.

Built-in timer values

Databricks recommends against invoking the system clock in your custom stateful application, as this can lead to unreliable retries on task failure. Use the methods in the TimerValues class when you must access the processing time or watermark:

TimerValues Description
getCurrentProcessingTimeInMs Returns the timestamp of the processing time for the current batch in milliseconds since epoch.
getCurrentWatermarkInMs Returns the timestamp of the watermark for the current batch in milliseconds since epoch.

What is state time to live (TTL)?

The state values used by transformWithState each support an optional time to live (TTL) specification. When TTL expires, the value is evicted from the state store. TTL only interacts with values in the state store, meaning you can implement logic to evict state information, but you cannot directly trigger logic as TTL evicts state values.

Important

If you do not implement TTL, you must handle state eviction using other logic to avoid endless state growth.

TTL is enforced for each state value, with different rules for each state type.

  • State variables are scoped to grouping keys.
  • For ValueState objects, only a single value is stored per grouping key. TTL applies to this value.
  • For ListState objects, the list can contain many values. TTL applies to each value in a list independently.
  • For MapState objects, each map key has an associated state value. TTL applies independently to each key-value pair in a map.

For all state types, TTL resets if the state information is updated.

Note

While TTL is scoped to individual values in a ListState, the only way to update a value in a list is to use the put method to overwrite the entire contents of the ListState variable.

What is the difference between timers and TTL?

There is some overlap between timers and time to live (TTL) for state variables, but timers provide a broader set of features than TTL.

TTL evicts state information that has not been updated for the period specified by the user. This allows users to prevent unchecked state growth and remove stale state entries. Because maps and lists implement TTL for each value, you can implement functions that only consider state values that have been updated recently by setting TTL.

Timers allow you to define custom logic beyond state eviction, including emitting records. You can optionally use timers to clear state information for a given state value, with the additional flexibility to emit values or trigger other conditional logic based on the timer.