次の方法で共有


ウォーターマークを使用して Delta Live Tables のステートフル処理を最適化する

状態に保持されているデータを効果的に管理するには、Delta Live Tables でステートフル ストリーム処理 (集計、結合、重複除去など) を実行するときにウォーターマークを使用します。 この記事では、Delta Live Tables クエリでウォーターマークを使用する方法について説明し、推奨される操作の例を示します。

Note

集計を実行するクエリが増分処理され、更新ごとに完全に再計算されないようにするには、ウォーターマークを使用する必要があります。

透かしとは

ストリーム処理では、"ウォーターマーク" は、集計などのステートフル操作を実行するときにデータを処理するための時間ベースのしきい値を定義できる Apache Spark 機能です。 到着したデータは、しきい値に達するまで処理され、その時点でしきい値によって定義された時間枠が閉じられます。 ウォーターマークを使用すると、クエリの処理中に、主に大規模なデータセットの処理や実行時間の長い処理を行う場合の問題を回避できます。 このような問題には、結果が出るまでの待機時間が長いことや、状態に保持されるデータ量が多いためにメモリ不足 (OOM) エラーが発生することが含まれることもあります。 ストリーミング データは本質的に順序付けされないため、ウォーターマークは時間枠の集計などの正しい計算操作もサポートします。

ストリーム処理でウォーターマークを使用する方法の詳細については、「Apache Spark Structured Streaming でのウォーターマーク」および「データ処理のしきい値を制御するためのウォーターマークの適用」を参照してください。

ウォーターマークを定義する方法

ウォーターマークを定義するには、タイムスタンプ フィールドと、到着 "遅延データ" の時間しきい値を表す値を指定します。 定義された時間しきい値の後に到着した場合、データは遅延と見なされます。 たとえば、しきい値が 10 分として定義されている場合、10 分のしきい値の後に到着したレコードは削除される場合があります。

定義されたしきい値の後に到着するレコードは削除される可能性があるため、待機時間と正確性の要件を満たすしきい値を選択することが重要です。 しきい値を小さくすると、レコードの出力が早くなりますが、遅延レコードが削除される可能性が高くなります。 しきい値を大きくすると、待機時間は長くなりますが、データの完全性が高くなる可能性があります。 しきい値を大きくすると、状態サイズが大きいため、追加のコンピューティング リソースが必要になる場合もあります。 しきい値はデータと処理の要件に依存するため、最適なしきい値を決定するには、処理のテストと監視が重要です。

ウォーターマークを定義するには、Python の withWatermark() 関数を使います。 SQL では、WATERMARK 句を使用してウォーターマークを定義します。

Python

withWatermark("timestamp", "3 minutes")

SQL

WATERMARK timestamp DELAY OF INTERVAL 3 MINUTES

ストリーム間の結合でウォーターマークを使用する

ストリーム間の結合の場合は、結合の両側にウォーターマークと時間間隔句を定義する必要があります。 各結合ソースにはデータの不完全なビューがあるため、それ以上一致できない場合にストリーミング エンジンに通知するには、時間間隔句が必要です。 時間間隔句では、ウォーターマークの定義に使用するのと同じフィールドを使用する必要があります。

各ストリームでウォーターマークに異なるしきい値が必要になる場合があるため、ストリームに同じしきい値を設定する必要はありません。 データの欠落を避けるため、ストリーミング エンジンは、最も低速なストリームに基づいて 1 つのグローバルウォーターマークを保持します。

次の例では、広告インプレッションのストリームと、ユーザーが広告をクリックしたストリームを結合します。 この例では、クリックはインプレッションから 3 分以内に発生する必要があります。 3 分の時間間隔が経過すると、一致しなくなった状態の行が削除されます。

Python

import dlt

dlt.create_streaming_table("adImpressionClicks")
@dlt.append_flow(target = "adImpressionClicks")
def joinClicksAndImpressions():
  clicksDf = (read_stream("rawClicks")
    .withWatermark("clickTimestamp", "3 minutes")
  )
  impressionsDf = (read_stream("rawAdImpressions")
    .withWatermark("impressionTimestamp", "3 minutes")
  )
  joinDf = impressionsDf.alias("imp").join(
  clicksDf.alias("click"),
  expr("""
    imp.userId = click.userId AND
    clickAdId = impressionAdId AND
    clickTimestamp >= impressionTimestamp AND
    clickTimestamp <= impressionTimestamp + interval 3 minutes
  """),
  "inner"
  ).select("imp.userId", "impressionAdId", "clickTimestamp", "impressionSeconds")

  return joinDf

SQL

CREATE OR REFRESH STREAMING TABLE
  silver.adImpressionClicks
AS SELECT
  imp.userId, impressionAdId, clickTimestamp, impressionSeconds
FROM STREAM
  (LIVE.bronze.rawAdImpressions)
WATERMARK
  impressionTimestamp DELAY OF INTERVAL 3 MINUTES imp
INNER JOIN STREAM
  (LIVE.bronze.rawClicks)
WATERMARK clickTimestamp DELAY OF INTERVAL 3 MINUTES click
ON
  imp.userId = click.userId
AND
  clickAdId = impressionAdId
AND
  clickTimestamp >= impressionTimestamp
AND
  clickTimestamp <= impressionTimestamp + interval 3 minutes

ウォーターマークを使用してウィンドウ集計を実行する

ストリーミング データに対する一般的なステートフル操作は、ウィンドウ集計です。 ウィンドウ集計はグループ化された集計に似ていますが、定義されたウィンドウの一部である行のセットに対して集計値が返される点が異なります。

ウィンドウは特定の長さとして定義でき、そのウィンドウの一部であるすべての行に対して集計操作を実行できます。 Spark ストリーミングでは、次の 3 種類のウィンドウがサポートされています。

  • タンブリング (固定) ウィンドウ: 固定サイズで重複しない一連の連続する時間間隔です。 入力レコードは 1 つのウィンドウにのみ属します。
  • スライディング ウィンドウ: タンブリング ウィンドウと同様に、スライディング ウィンドウは固定サイズですが、ウィンドウが重複してレコードが複数のウィンドウに分類される場合があります。

データがウィンドウの末尾とウォーターマークの長さを超えて到着すると、そのウィンドウで新しいデータは受け入れられず、集計の結果が出力され、ウィンドウの状態が削除されます。

次の例では、固定ウィンドウを使用して 5 分ごとにインプレッションの合計を計算します。 この例では、select 句はエイリアス impressions_windowを使用し、ウィンドウ自体は GROUP BY 句の一部として定義されています。 ウィンドウは、ウォーターマークと同じタイムスタンプ列 (この例の clickTimestamp 列) に基づいている必要があります。

CREATE OR REFRESH STREAMING TABLE
  gold.adImpressionSeconds
AS SELECT
  impressionAdId, impressions_window, sum(impressionSeconds) as totalImpressionSeconds
FROM STREAM
  (LIVE.silver.adImpressionClicks)
WATERMARK
  clickTimestamp DELAY OF INTERVAL 3 MINUTES
GROUP BY
  impressionAdId, window(clickTimestamp, "5 minutes")

Python の同様の例では、時間単位の固定ウィンドウで利益を計算します。

import dlt

@dlt.table()
def profit_by_hour():
  return (
    spark.readStream.table("sales")
      .withWatermark("timestamp", "1 hour")
      .groupBy(window("timestamp", "1 hour").alias("time"))
      .aggExpr("sum(profit) AS profit")
  )

ストリーミング レコードの重複除去

構造化ストリーミングでは、1 回だけ処理が保証されますが、データ ソースからレコードが自動的に重複除去されることはありません。 たとえば、多くのメッセージ キューには少なくとも 1 回の保証があるため、これらのメッセージ キューの 1 つから読み取るときに重複するレコードが想定されるはずです。 dropDuplicatesWithinWatermark() 関数を使用すると、指定したフィールドのレコードを重複除去できます。これにより、一部のフィールドが異なる場合 (イベント時刻や到着時刻など) でも、ストリームから重複を削除できます。 この dropDuplicatesWithinWatermark() 関数を使用するには、ウォーターマークを指定する必要があります。 ウォーターマークで指定された時間内に到着した重複データはすべて削除されます。

順序が正しくないデータは、ウォーターマークの値が誤って先にジャンプしてしまう原因になるため、順序付けされたデータは重要です。 その後、より古いデータが到着すると、遅延と見なされ、削除されます。 withEventTimeOrder オプションを使用して、ウォーターマークで指定されたタイムスタンプに基づいて初期スナップショットを順番に処理します。 withEventTimeOrder オプションは、データセットを定義するコードまたは spark.databricks.delta.withEventTimeOrder.enabled を使用してパイプライン設定で宣言できます。 次に例を示します。

{
  "spark_conf": {
    "spark.databricks.delta.withEventTimeOrder.enabled": "true"
  }
}

Note

withEventTimeOrder オプションは Python でのみサポートされています。

次の例では、データは clickTimestamp で並べ替えられた順序で処理され、重複する userIdclickAdId 列を含む互いに 5 秒以内に到着したレコードは削除されます。

clicksDedupDf = (
  spark.readStream.table
    .option("withEventTimeOrder", "true")
    .table("LIVE.rawClicks")
    .withWatermark("clickTimestamp", "5 seconds")
    .dropDuplicatesWithinWatermark(["userId", "clickAdId"]))

ステートフル処理用にパイプライン構成を最適化する

運用環境の問題や過剰な待機時間を防ぐために、Databricks では、特に処理で大量の中間状態を保存する必要がある場合に、ステートフル ストリーム処理に対して RocksDB ベースの状態管理を有効にすることをお勧めします。

サーバーレス パイプラインでは、状態ストアの構成が自動的に管理されます。

パイプラインをデプロイする前に次の構成を設定することで、RocksDB ベースの状態管理を有効にすることができます。

{
  "configuration": {
     "spark.sql.streaming.stateStore.providerClass": "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
  }
}

RocksDB の構成に関する推奨事項など、RocksDB 状態ストアの詳細については、「Azure Databricks で RocksDB 状態ストアを構成する」を参照してください。