APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化
Delta Live Tables で APPLY CHANGES
と APPLY CHANGES FROM SNAPSHOT
API を使うと、変更データ キャプチャ (CDC) が簡単になります。 使用するインターフェイスは、変更データのソースによって異なります。
- 変更データ フィード (CDF) からの変更を処理するには、
APPLY CHANGES
を使用します。 - データベース スナップショットの変更を処理するには、
APPLY CHANGES FROM SNAPSHOT
(パブリック プレビュー) を使用します。
これまで、Azure Databricks で CDC レコードを処理するには、MERGE INTO
ステートメントが一般に使われていました。 しかし、MERGE INTO
では、順序が正しくないレコードのために誤った結果が生成される場合や、レコードを並べ替えるために複雑なロジックが必要になる場合があります。
APPLY CHANGES
API は、Delta Live Tables SQL および Python インターフェイスでサポートされています。 APPLY CHANGES FROM SNAPSHOT
API は、Delta Live Tables Python インターフェイスでサポートされています。
APPLY CHANGES
と APPLY CHANGES FROM SNAPSHOT
の両方で、SCD タイプ 1 とタイプ 2 を使用したテーブルの更新がサポートされます。
- レコードを直接更新するには、SCD タイプ 1 を使用します。 更新されたレコードの履歴は保持されません。
- SCD タイプ 2 を使用して、すべての更新または指定された列のセットの更新時にレコードの履歴を保持します。
構文とその他の参考資料については、以下を参照してください。
Note
この記事では、ソース データの変更に基づいて Delta Live Tables パイプラインのテーブルを更新する方法について説明します。 Delta テーブルの行レベルの変更情報を記録およびクエリする方法については、「Azure Databricks で Delta Lake 変更データ フィードを使用する」をご覧ください。
要件
CDC API を使用するには、サーバーレス DLT パイプラインあるいは Delta Live Tables の Pro
または Advanced
エディションを使用するようにパイプラインを構成する必要があります。
APPLY CHANGES
API ではどのように CDC が実装されますか?
Delta Live Tables の APPLY CHANGES
API を使うと、正しくない順序のレコードが自動的に処理されるので、CDC レコードの正しい処理が保証され、正しくない順序のレコードを処理するために複雑なロジックを開発する必要がなくなります。 レコードのシーケンス処理に使用するソース データ内の列を指定する必要があります。Delta Live Tables は、ソース データの適切な順序付けを単調増加する表現として解釈します。 Delta Live Tables は、順不同で到着したデータを自動的に処理します。 SCD タイプ 2 の変更の場合、Delta Live Tables では、ターゲット テーブルの __START_AT
および __END_AT
列に適切なシーケンス値が反映されます。 各シーケンス値ではキーごとに 1 つの個別の更新が必要であり、NULL シーケンス値はサポートされていません。
APPLY CHANGES
で CDC 処理を実行するには、まずストリーミング テーブルを作成してから、SQL の APPLY CHANGES INTO
ステートメントまたは Python の apply_changes()
関数を使用して、変更フィードのソース、キー、シーケンスを指定します。 ターゲット ストリーミング テーブルを作成するには、SQL の CREATE OR REFRESH STREAMING TABLE
ステートメントまたは Python の create_streaming_table()
関数を使用します。 SCD タイプ 1 とタイプ 2 の処理の例を参照してください。
構文の詳細については、Delta Live Tables の「SQL リファレンス」または「Python リファレンス」を参照してください。
APPLY CHANGES FROM SNAPSHOT
API ではどのように CDC が実装されますか?
重要
APPLY CHANGES FROM SNAPSHOT
API はパブリック プレビュー段階です。
APPLY CHANGES FROM SNAPSHOT
は、一連の順番どおりのスナップショットを比較してソース データの変更を効率的に判断し、スナップショット内のレコードの CDC 処理に必要な処理を実行する宣言型 API です。 APPLY CHANGES FROM SNAPSHOT
は、Delta Live Tables Python インターフェイスでのみサポートされます。
APPLY CHANGES FROM SNAPSHOT
では、複数のソースの種類からのスナップショットの取り込みがサポートされています。
- 定期的なスナップショット インジェストを使用して、既存のテーブルまたはビューからスナップショットを取り込みます。
APPLY CHANGES FROM SNAPSHOT
には、既存のデータベース オブジェクトからのスナップショットの定期的な取り込みをサポートするシンプルで合理化されたインターフェイスがあります。 パイプラインの更新ごとに新しいスナップショットが取り込まれ、取り込み時間がスナップショット バージョンとして使用されます。 パイプラインを連続モードで実行すると、APPLY CHANGES FROM SNAPSHOT 処理を含むフローのトリガー間隔設定によって決定された期間に、パイプラインの更新ごとに複数のスナップショットが取り込まれます。 - 履歴スナップショット インジェストを使用して、Oracle または MySQL データベースあるいはデータ ウェアハウスから生成されたスナップショットなど、データベース スナップショットを含むファイルを処理します。
APPLY CHANGES FROM SNAPSHOT
を使って任意のソースの種類から CDC 処理を実行するには、まずストリーミング テーブルを作成してから、Python で apply_changes_from_snapshot()
関数を使用して、処理を実装するために必要なスナップショット、キー、およびその他の引数を指定します。 定期的なスナップショット インジェストと履歴スナップショット インジェストの例を参照してください。
API に渡されるスナップショットは、バージョン別の昇順である必要があります。 Delta Live Tables で順序が正しくないスナップショットが検出されると、エラーがスローされます。
構文の詳細については、Delta Live Tables の「Python リファレンス」を参照してください。
制限事項
シーケンス処理に使用する列は、並べ替え可能なデータ型である必要があります。
例: CDF ソース データを使用した SCD タイプ 1 と SCD タイプ 2 の処理
次のセクションでは、以下を行う変更データ フィードからのソース イベントに基づいてターゲット テーブルを更新する Delta Live Tables SCD タイプ 1 とタイプ 2 のクエリの例を示します。
- 新しいユーザー レコードを作成します。
- ユーザー レコードを削除します。
- ユーザー レコードを更新します。 SCD タイプ 1 の例では、最後の
UPDATE
操作が遅れて到着し、ターゲット テーブルからドロップされます。これは順不同のイベントの処理を示しています。
次の例は、Delta Live Tables パイプラインの構成と更新に精通していることを前提としています。 「チュートリアル: 最初の Delta Live Tables パイプラインを実行する」を参照してください。
これらの例を実行するには、サンプル データセットを作成することから始める必要があります。 「テスト データを生成する」を参照してください。
これらの例の入力レコードは以下の通りです。
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
124 | Raul | Oaxaca | INSERT | 1 |
123 | Isabel | Monterrey | INSERT | 1 |
125 | Mercedes | Tijuana | INSERT | 2 |
126 | Lily | Cancun | INSERT | 2 |
123 | null | null | DELETE | 6 |
125 | Mercedes | Guadalajara | UPDATE | 6 |
125 | Mercedes | Mexicali | UPDATE | 5 |
123 | Isabel | Chihuahua | UPDATE | 5 |
サンプル データの最後の行のコメントを解除すると、レコードを切り捨てる場所を指定する次のレコードが挿入されます。
userId | name | city | operation | sequenceNum |
---|---|---|---|---|
null | null | null | TRUNCATE | 3 |
Note
次のすべての例には、DELETE
と TRUNCATE
の両方の操作を指定するオプションが含まれていますが、それぞれ省略可能です。
SCD タイプ 1 の更新を処理する
次の例は、SCD タイプ 1 の更新の処理を示しています。
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
SCD タイプ 1 の例でクエリを実行した後、ターゲット テーブルには次のレコードが含まれます。
userId | name | city |
---|---|---|
124 | Raul | Oaxaca |
125 | Mercedes | Guadalajara |
126 | Lily | Cancun |
追加の TRUNCATE
レコードで SCD タイプ 1 の例を実行すると、124
および 126
は sequenceNum=3
での TRUNCATE
操作のためにレコードが切り捨てられ、ターゲット テーブルに次のレコードが含まれます。
userId | name | city |
---|---|---|
125 | Mercedes | Guadalajara |
SCD タイプ 2 の更新を処理する
次の例は、SCD タイプ 2 の更新の処理を示しています。
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
SCD タイプ 2 の例でクエリを実行した後、ターゲット テーブルには次のレコードが含まれます。
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Monterrey | 1 | 5 |
123 | Isabel | Chihuahua | 5 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Tijuana | 2 | 5 |
125 | Mercedes | Mexicali | 5 | 6 |
125 | Mercedes | Guadalajara | 6 | null |
126 | Lily | Cancun | 2 | null |
SCD タイプ 2 クエリでは、ターゲット テーブル内の履歴に対して、追跡する出力列のサブセットを指定することもできます。 他の列への変更は、新しい履歴レコードを生成するのではなく、上書きでレコードが更新されます。 次の例では、city
列の追跡からの除外を示します。
次の例は、SCD タイプ 2 で履歴追跡を使用する方法を示しています。
Python
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
APPLY CHANGES INTO
live.target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
この例を追加の TRUNCATE
レコードなしで実行した後、そのターゲット テーブルには次のレコードが含まれます。
userId | name | city | __START_AT | __END_AT |
---|---|---|---|---|
123 | Isabel | Chihuahua | 1 | 6 |
124 | Raul | Oaxaca | 1 | null |
125 | Mercedes | Guadalajara | 2 | null |
126 | Lily | Cancun | 2 | null |
テスト データを生成する
次のコードは、このチュートリアルにあるサンプル クエリで使用するサンプル データセットを生成する目的で提供されています。 新しいスキーマを作成し、新しいテーブルを作成するための適切な資格情報を持っていれば、ノートブックまたは Databricks SQL を使用して、これらのステートメントを実行できます。 次のコードは、Delta Live Tables パイプラインの一部として実行することを意図したものではありません。
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
例: 定期的なスナップショットの処理
次の例は、mycatalog.myschema.mytable
に格納されているテーブルのスナップショットを取り込む SCD タイプ 2 の処理を示しています。 処理の結果は、target
という名前のテーブルに書き込まれます。
タイムスタンプ 2024-01-01 00:00:00 の mycatalog.myschema.mytable
レコード
Key | 値 |
---|---|
1 | a1 |
2 | a2 |
タイムスタンプ 2024-01-01 12:00:00 の mycatalog.myschema.mytable
レコード
Key | Value |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.apply_changes_from_snapshot(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。
Key | Value | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024-01-01 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | null |
3 | a3 | 2024-01-01 12:00:00 | null |
例: 履歴スナップショットの処理
次の例は、クラウド ストレージ システムに格納されている 2 つのスナップショットからのソース イベントに基づいてターゲット テーブルを更新する SCD タイプ 2 の処理を示しています。
/<PATH>/filename1.csv
に格納されている timestamp
のスナップショット
キー | TrackingColumn | NonTrackingColumn |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
/<PATH>/filename2.csv
に格納されている timestamp + 5
のスナップショット
キー | TrackingColumn | NonTrackingColumn |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
次のコード例は、これらのスナップショットを使用した SCD タイプ 2 の更新の処理を示しています。
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.apply_changes_from_snapshot(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
スナップショットの処理後、ターゲット テーブルには次のレコードが含まれます。
キー | TrackingColumn | NonTrackingColumn | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | null |
3 | a3 | b3 | 2 | null |
4 | a4 | b4_new | 1 | null |
ターゲット ストリーミング テーブルのデータを追加、変更、または削除する
パイプラインがテーブルを Unity Catalog に発行する場合は、挿入ステートメント、更新ステートメント、削除ステートメント、マージ ステートメントなどのデータ操作言語 (DML) ステートメントを使用して、APPLY CHANGES INTO
ステートメントによって作成されたターゲット ストリーミング テーブルを変更することができます。
Note
- ストリーミング テーブルのテーブル スキーマを変更する DML ステートメントはサポートされていません。 DML ステートメントがテーブル スキーマの進化を試みないことを確認します。
- ストリーミング テーブルを更新する DML ステートメントは、Databricks Runtime 13.3 LTS 以降を使用する共有 Unity Catalog クラスターまたは SQL ウェアハウスでのみ実行できます。
- ストリーミングには追加専用のデータソースが必要なため、処理で (DML ステートメントなどによる) 変更を伴うソース ストリーミング テーブルからのストリーミングが必要な場合は、ソース ストリーミング テーブルを読み取るときに skipChangeCommits フラグを設定します。
skipChangeCommits
が設定されていれば、ソース テーブルのレコードを削除または変更するトランザクションは無視されます。 処理でストリーミング テーブルが必要ない場合は、(追加専用の制限がない) 具体化されたビューをターゲット テーブルとして使用できます。
Delta Live Tables は指定された SEQUENCE BY
列を使用し、ターゲット テーブルの __START_AT
列と __END_AT
列に適切なシーケンス値を反映させるため (SCD タイプ 2 の場合)、DML ステートメントがこれらの列に有効な値を使用して、レコードの適切な順序付けを維持する必要があります。 「APPLY CHANGES API ではどのように CDC が実装されますか?」を参照してください。
ストリーミング テーブルで DML ステートメントを使用する方法の詳細については、「ストリーミング テーブルのデータを追加、変更、または削除する」を参照してください。
次の例では、開始シーケンスが 5 のアクティブ なレコードを挿入します。
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
APPLY CHANGES
ターゲット テーブルから変更データ フィードを読み取る
Databricks Runtime 15.2 以降では、他の Delta テーブルから変更データ フィードを読み取るのと同じ方法で、APPLY CHANGES
クエリまたは APPLY CHANGES FROM SNAPSHOT
クエリのターゲットであるストリーミング テーブルから変更データ フィードを読み取ることができます。 ターゲット ストリーミング テーブルから変更データ フィードを読み取るには、次のものが必要です。
- ターゲット ストリーミング テーブルは、Unity Catalog に公開する必要があります。 「Unity Catalog を Delta Live Tables パイプラインで使う」を参照してください。
- ターゲット ストリーミング テーブルから変更データ フィードを読み取るには、Databricks Runtime 15.2 以降を使用する必要があります。 別の Delta Live テーブル パイプラインで変更データ フィードを読み取るためには、Databricks Runtime 15.2 以降を使用するようにパイプラインを構成する必要があります。
他のDelta テーブルから変更データフィードを読み取るのと同じように、Delta Live テーブル パイプラインで作成されたターゲットストリーミングテーブルから変更データフィードを読み取ります。 Python と SQL の例を含む、Delta 変更データ フィード機能の使用の詳細については、「Azure Databricks での Delta Lake 変更データ フィードの使用」を参照してください。
Note
変更データ フィード レコードには、変更イベントの種類を識別する メタデータ が含まれます。 テーブル内のレコードが更新されると、関連付けられている変更レコードのメタデータには、通常、update_preimage
と update_postimage
のイベントに設定された _change_type
値が含まれます。
ただし、主キー値の変更を含むターゲット ストリーミング テーブルに対して更新が行われる場合は、_change_type
値は異なります。 変更に主キーの更新が含まれている場合、_change_type
メタデータ フィールドは insert
と delete
のイベントに設定されます。 主キーに対する変更は、UPDATE
または MERGE
ステートメントを使用していずれかのキー フィールドに対して手動で更新を行った場合、または SCD タイプ 2 テーブルの場合、__start_at
フィールドが以前の開始シーケンス値を反映するように変更された場合に発生する可能性があります。
APPLY CHANGES
クエリは、SCD タイプ 1 と SCD タイプ 2 の処理で異なる主キー値を決定します。
- SCD タイプ 1 の処理と Delta Live テーブル Python インターフェイスの場合、主キーは
apply_changes()
関数のkeys
パラメーターの値です。 Delta Live テーブル SQL インターフェイスの場合は、主キーはAPPLY CHANGES INTO
ステートメントのKEYS
句によって定義された列です。 - SCD 型 2 の場合、主キーは
keys
パラメーターまたはKEYS
句にcoalesce(__START_AT, __END_AT)
操作からの戻り値を加えたものになり、ここで、__START_AT
と__END_AT
はターゲット ストリーミング テーブルの対応する列です。
Delta Live Tables CDC クエリで処理されたレコードに関するデータを取得する
Note
次のメトリックは、APPLY CHANGES FROM SNAPSHOT
クエリではなく、APPLY CHANGES
クエリによってのみキャプチャされます。
次のメトリックは APPLY CHANGES
クエリによってキャプチャされます。
num_upserted_rows
: 更新中にそのデータセットにアップサートされた出力行の数。num_deleted_rows
: 更新中にそのデータセットから削除された既存の出力行の数。
非 CDC フローの出力である num_output_rows
メトリックは、apply changes
のクエリではキャプチャされません。
Delta Live Tables の CDC 処理には、どのようなデータ オブジェクトが使用されますか?
注: 次のデータ構造は、APPLY CHANGES FROM SNAPSHOT
処理ではなく、APPLY CHANGES
処理にのみ適用されます。
Hive メタストアでターゲット テーブルを宣言すると、次の 2 つのデータ構造が作成されます。
- ターゲット テーブルに割り当てられた名前を使用するビュー。
- Delta Live Tables が CDC 処理の管理に使用する内部バッキング テーブル。 このテーブルには、ターゲット テーブル名の前に
__apply_changes_storage_
を付加した名前が付けられます。
たとえば、dlt_cdc_target
という名前のターゲット テーブルを宣言すると、メタストア内に dlt_cdc_target
という名前のビューと __apply_changes_storage_dlt_cdc_target
という名前のテーブルが表示されます。 ビューを作成すると、Delta Live Tables は、順不同のデータの処理に必要な追加情報 (廃棄標識やバージョンなど) を除外できます。 処理されたデータを表示するには、ターゲット ビューに対してクエリを実行します。 __apply_changes_storage_
テーブルのスキーマは、将来の機能や機能強化をサポートするために変更される可能性があるため、運用環境向けのテーブルに対してクエリを実行しないでください。 テーブルにデータを手動で追加すると、バージョン列がないため、レコードは他の変更の前にあると見なされます。
パイプラインで Unity Catalog に発行する場合、ユーザーは内部バッキング テーブルにアクセスできません。