ステートフル ストリーミングとは
"ステートフルな" 構造化ストリーミング クエリでは、中間状態情報の増分更新が必要です。一方、"ステートレスな" 構造化ストリーミング クエリでは、ソースからシンクに対して処理された行に関する情報のみが追跡されます。
ステートフル操作には、ストリーミングの集約、ストリーミングの dropDuplicates
、ストリーム ストリーム結合、mapGroupsWithState
、flatMapGroupsWithState
が含まれます。
ステートフルな構造化ストリーミング クエリに必要な中間状態情報は、適切に構成されていない場合、予期しない待機時間と運用環境の問題につながる可能性があります。
Databricks Runtime 13.2 LTS 以降では、構造化ストリーミング ワークロードのチェックポイント期間とエンドツーエンドの待機時間を短縮するために、RockDB の変更ログのチェックポイント処理を有効にすることができます。 Databricks では、すべての構造化ストリーミング ステートフル クエリに対して変更ログのチェックポイント処理を有効にすることをお勧めします。 変更ログのチェックポイント処理を有効にするを参照してください。
ステートフルな構造化ストリーミング クエリを最適化する
ステートフルな構造化ストリーミング クエリの中間状態の情報を管理することにより、予期せぬ待機時間や運用環境の問題を防ぐことができます。
Databricks では次を推奨しています。
- コンピューティングに最適化されたインスタンスをワーカーとして使用します。
- シャッフル パーティションの数を、クラスター内のコア数の 1 倍から 2 倍に設定します。
- SparkSession の
spark.sql.streaming.noDataMicroBatches.enabled
構成をfalse
に設定します。 これにより、ストリーミング マイクロバッチ エンジンは、データを含まないマイクロバッチを処理できなくなります。 また、この構成をfalse
に設定すると、即時ではなく新しいデータが到着するまで、ウォーターマークや処理時間のタイムアウトを活用するステートフル操作によってデータ出力が得られない可能性があることに注意してください。
Databricks では、ステートフル ストリームの状態を管理するために、RocksDB と変更ログのチェックポイント処理を使用することをお勧めします。 「Azure Databricks で RocksDB 状態ストアを構成する」をご覧ください。
Note
状態管理スキームは、クエリの再起動間で変更できません。 つまり、既定の管理を使用してクエリを開始した場合、新しいチェックポイントの場所でクエリを最初から開始しない限り、クエリを変更することはできません。
構造化ストリーミングで複数のステートフル演算子を操作する
Databricks Runtime 13.3 LTS 以降では、Azure Databricks により、構造化ストリーミング ワークロードのステートフル演算子に高度なサポートが提供されます。 複数のステートフル演算子を連結できるようになりました。つまり、ウィンドウ集計などの操作の出力を結合などの別のステートフル操作にフィードできます。
次の例では、使用できるいくつかのパターンを示します。
重要
複数のステートフル演算子を操作する場合、次の制限があります。
FlatMapGroupWithState
はサポートされません。- 追加出力モードのみがサポートされています。
チェーンされた時間枠の集計
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
2 つの異なるストリームの時間枠の集計の後にストリーム同士の枠の結合が続く
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
ストリーム同士の時間間隔結合の後に時間枠の集計が続く
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
構造化ストリーミングの状態の再調整
Delta Live Tables のすべてのストリーミング ワークロードに対して、状態の再調整が既定で有効になっています。 Databricks Runtime 11.3 LTS 以降では、Spark クラスター構成で次の構成オプションを設定して、状態の再調整を有効にすることができます。
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
状態を再調整すると、クラスターのサイズ変更イベントが発生するステートフルな構造化ストリーミング パイプラインにメリットがあります。 クラスター サイズの変化に関わらず、ステートレス ストリーミング操作には、メリットがありません。
Note
コンピューティングの自動スケールには、構造化ストリーミング ワークロードのクラスター サイズのスケールダウンに制限があります。 Databricks では、ストリーミング ワークロードに、拡張自動スケーリングを備えた Delta Live Tables を使用することをお勧めします。 拡張自動スケーリングを使用して Delta Live Tables パイプラインのクラスター使用率を最適化するを参照してください。
クラスターのサイズ変更イベントにより、状態の再調整がトリガーされます。 再調整イベント中に、状態がクラウド ストレージから新しい Executor に読み込まれるため、マイクロバッチの待機時間が長くなる場合があります。
mapGroupsWithState
の初期状態を指定する
flatMapGroupsWithState
または mapGroupsWithState
を使用して構造化ストリーミング ステートフル処理のユーザー定義初期状態を指定できます。 これにより、有効なチェックポイントなしでステートフル ストリームを開始するときにデータの再処理を回避できます。
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
flatMapGroupsWithState
演算子の初期状態を指定する使用例は、次のとおりです。
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState
演算子の初期状態を指定する使用例は、次のとおりです。
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState
の更新関数をテストする
TestGroupState
API を使用すると、Dataset.groupByKey(...).mapGroupsWithState(...)
と Dataset.groupByKey(...).flatMapGroupsWithState(...)
に使用される状態更新関数をテストできます。
状態更新関数は、GroupState
型のオブジェクトを使用して、前の状態を入力として受け取ります。 Apache Spark の GroupState リファレンス ドキュメントを参照してください。 例:
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}