次の方法で共有


構造化ストリーミングの出力モードを選択する

この記事では、ステートフル ストリーミングの出力モードの選択について説明します。 集約を含むステートフル ストリームのみが出力モードの構成を必要とします。

結合では追加出力モードのみがサポートされ、出力モードは重複除去に影響しません。 任意のステートフル演算子 mapGroupsWithState および flatMapGroupsWithState は、独自のカスタム ロジックを使用してレコードを出力するため、ストリームの出力モードは動作に影響しません。

ステートレス ストリーミングの場合、すべての出力モードは同じように動作します。

出力モードを正しく構成するには、ステートフル ストリーミング、ウォーターマーク、トリガーを理解する必要があります。 次の記事をご覧ください。

出力モードとは

構造化ストリーミング クエリの出力モードでは、各トリガー中にクエリの演算子が出力するレコードが決まります。 出力できるレコードの種類は次の 3 つです。

  • 今後の処理で変更されないレコード。
  • 最後のトリガー以降に変更されたレコード。
  • 状態テーブル内のすべてのレコード。

ステートフル演算子によって生成される特定の行がトリガーによって変わる場合があるため、出力するレコードの種類を把握することはステートフル演算子にとって重要です。 たとえば、ストリーミング集計演算子が特定のウィンドウに対してより多くの行を受け取る場合、そのウィンドウの集計値はトリガー間で変更される可能性があります。

ステートレス演算子の場合、レコード型の区別は演算子の動作に影響しません。 ステートレス演算子がトリガー中に出力するレコードは、常にそのトリガーの間に処理されるソース レコードです。

使用可能な出力モード

特定のトリガー中に出力するレコードを演算子に指示する 3 つの出力モードがあります。

出力モード 説明
追加モード (既定) 既定では、ストリーミング クエリは追加モードで実行されます。 このモードでは、演算子は今後のトリガーで変更されない行のみを出力します。 ステートフル演算子では、ウォーターマークを使用して、これがいつ発生するかを判断します。
更新モード 更新モードでは、出力されたレコードが後続のトリガーで変更される可能性がある場合でも、トリガー中に変更されたすべての行が演算子によって出力されます。
完全モード 完全モードは、ストリーミング集計でのみ機能します。 完全モードでは、演算子によって生成されたすべての結果行がダウンストリームに出力されます。

実稼働に関する考慮事項

多くのステートフル ストリーミング操作では、追加および更新モードを選択する必要があります。 以下のセクションでは、決断する際に役立つ可能性がある考慮事項を概説します。

Note

完全モードにはいくつかの用途がありますが、データが大きくなるにつれてパフォーマンスが低下するおそれがあります。 Databricks は、多くのステートフルなオペレーションに対して増分処理を行う完全モードに関連したセマンティックな保証を得るために、具体化されたビューを使用することをお勧めします。 「Databricks SQL の具体化されたビューを使用する」を参照してください。

アプリケーション セマンティクス

アプリケーション セマンティクスは、ダウンストリーム アプリケーションがストリーミング データを使用する方法を記述します。

ダウンストリーム サービスがダウンストリームの書き込みごとに 1 つのアクションを実行する必要がある場合は、ほとんどの場合、追加モードを使用します。 たとえば、シンクに書き込まれるすべての新しいレコードに対して通知を送信するダウンストリーム通知サービスがある場合、追加モードでは各レコードが確実に 1 回だけ書き込まれます。 更新モードでは、状態情報が変更されるたびにレコードが書き込まれ、結果として多数の更新が行われます。

ダウンストリーム サービスで新しい結果が必要な場合は、更新モードによってシンクが可能な限り最新の状態に確保されます。 たとえば、リアルタイムで特徴を読み取る機械学習モデルや、リアルタイム集計を追跡する分析ダッシュボードなどがあります。

演算子とシンクの互換性

構造化ストリーミングでは、Apache Spark で使用できるすべての操作がサポートされるわけではありません。また、一部のストリーミング操作はすべての出力モードでサポートされているわけではありません。 演算子の制限の詳細については、OSS ストリーミングに関するドキュメントを参照してください。

すべてのシンクですべての出力モードがサポートされているわけではありません。 すべての Unity Catalog マネージド テーブルをサポートする Delta Lake と Kafka は、両方ともすべての出力モードをサポートします。 シンクの互換性の詳細については、OSS ストリーミングに関するドキュメントを参照してください。

待機時間とコスト

出力モードは、レコードを書き込む前に経過する必要がある時間に影響し、書き込まれるデータの頻度と量は、ストリーミング パイプラインに関連するコストに影響する可能性があります。

追加モードでは、ステートフル演算子は、ステートフルな結果が確定した後にのみ結果を発行するように強制されます。これは、少なくともウォーターマークの延期期間と同じ長さです。 追加出力モードでウォーターマークの延期期間が 1 hour の場合は、レコードがダウンストリームに出力されるまでに少なくとも 1 時間の遅延があることを意味します。

更新モードでは、集約値 1 つにつき、トリガー毎に、書き込みが 1 回行われます。 レコードごとの各書き込みにシンク料金が発生する場合、ウォーターマークの延期期間が経過する前にレコードが何度も更新されると、コストが高くなる可能性があります。

構成の例

次のコード例は、Unity Catalog テーブルへの更新をストリーミングするための出力モードの構成を示しています。

Python

# Append output mode (default)
(df.writeStream
  .toTable("target_table")
)

# Append output mode (same as default behavior)
(df.writeStream
  .outputMode("append")
  .toTable("target_table")
)

# Update output mode
(df.writeStream
  .outputMode("update")
  .toTable("target_table")
)

# Complete output mode
(df.writeStream
  .outputMode("complete")
  .toTable("target_table")
)

Scala

// Append output mode (default)
df.writeStream
  .toTable("target_table")

// Append output mode (same as default behavior)
df.writeStream
  .outputMode("append")
  .toTable("target_table")

// Update output mode
df.writeStream
  .outputMode("update")
  .toTable("target_table")

// Complete output mode
df.writeStream
  .outputMode("complete")
  .toTable("target_table")

PySpark DataStreamWriter.outputMode または Scala DataStreamWriter.outputMode の OSS のドキュメントを参照してください。

ステートフルなストリーミングおよび出力モードの例

次の例は、ステートフル ストリーミングのために出力モードがウォーターマークと相互作用するしくみをわかりやすく説明するために作成されました。

ウォーターマークの延期期間が 15 分の店舗で 1 時間ごとに生成される合計収益を計算するストリーミング集計について考えてみましょう。 最初のマイクロバッチは、次のレコードを処理します。

  • 午後 2 時 40 分に $15
  • 午後 2 時 30 分に $10
  • 午後 3 時 10 分に $30

この時点では、エンジンのウォーターマークは午後 2 時 55 分です。これは、表示される最大時間 (午後 3 時 10 分) から 15 分 (延期期間) を減算するためです。 ストリーミング集計演算子の状態は次のとおりです。

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $30

次の表は、各出力モードで起きる内容を示しています。

出力モード 結果と理由
追加 ストリーミング集計演算子は、ダウンストリームに何も出力しません。 これは、新しい値が後続のトリガーと共に表示されると、両方のウィンドウが変更される可能性があるためです。午後 2 時 55 分のウォーターマークは、午後 2 時 55 分以降のレコードが引き続き到着し、それらのレコードが [2pm, 3pm] ウィンドウまたは [3pm, 4pm] ウィンドウのいずれかに分類される可能性があることを示します。
更新する 両方のレコードが更新を受信したため、演算子により両方のレコードが出力されます。
完了 演算子は、すべてのレコードを出力します。

次に、ストリームがもう 1 つのレコードを受信するとします。

  • 午後 3 時 20 分に $20

エンジンが午後 3 時 20 分から 15 分減算するため、ウォーターマークは午後 3 時 5 分に更新されます。 この時点で、ストリーミング集計演算子の状態は次のようになります。

  • [2pm, 3pm]: $25
  • [3pm, 4pm]: $50

次の表は、各出力モードで起きる内容を示しています。

出力モード 結果と理由
追加 ストリーミング集計演算子により、午後 3 時 5 分のウォーターマークが [2pm, 3pm] ウィンドウの末尾より大きいことが観察されます。 ウォーターマークの定義により、そのウィンドウは変更できなくなり、[2pm, 3pm] ウィンドウが出力されます。
更新する 状態値が $30 から $50 に変更されたため、ストリーミング集計演算子により [3pm, 4pm] ウィンドウが出力されます。
完了 演算子は、すべてのレコードを出力します。

次に、各追加モードでのステートフル演算子の動作の概要を示します。

  • 追加モードでは、ウォーターマークの延期期間の後にレコードを 1 回書き込みます。
  • 更新モードでは、前のトリガー以降に変更されたレコードを書き込みます。
  • 完全モードでは、ステートフル演算子によって生成されたすべてのレコードを書き込みます。