マージを使用した Delta Lake テーブルへの upsert
ソース テーブル、ビュー、または DataFrame のデータをターゲット Delta テーブルにアップサートするには、MERGE
SQL 操作を使用します。 Delta Lake では、MERGE
での挿入、更新、削除がサポートされ、高度なユース ケースを容易にするために、SQL 標準を超える拡張構文がサポートされています。
people10mupdates
という名前のソース テーブル、または people10m
という名前を持つターゲット テーブルの新しいデータを含む /tmp/delta/people-10m-updates
のソース パス、または /tmp/delta/people-10m
のターゲット パスがある場合を考えます。 これらの新しいレコードの一部は、ターゲット データに既に存在している可能性があります。 新しいデータをマージするには、ユーザーの id
が既に存在する行を更新し、一致する id
がない新しい行を挿入します。 次のクエリを実行できます。
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
重要
ターゲット テーブル内の特定の行と一致できるのは、ソース テーブルの 1 つの行だけです。 Databricks Runtime 16.0 以降では、 MERGE
and ON
句で指定された条件をWHEN MATCHED
評価して、重複する一致を判断します。 Databricks Runtime 15.4 LTS 以下では、 MERGE
操作では句で ON
指定された条件のみが考慮されます。
Scala、Python 構文の詳細については、Delta Lake API ドキュメントを参照してください。 SQL 構文の詳細については、「MERGE INTO」を参照してください
マージを使用して一致しないすべての行を変更する
Databricks SQL および Databricks Runtime 12.2 LTS 以降では、WHEN NOT MATCHED BY SOURCE
句を、ソース テーブルに対応するレコードがないターゲット テーブル内の UPDATE
または DELETE
レコードに使用することができます。 Databricks では、ターゲット テーブルが完全に書き直されないように、オプションの条件句を追加することを推奨しています。
次のコード例は、これを削除に使用する基本的な構文を示しており、ターゲット テーブルをソース テーブルの内容で上書きし、ターゲット テーブルの一致しないレコードを削除するものです。 ソースの更新や削除に期限があるテーブルのよりスケーラブルなパターンについては、「Delta テーブルをソースと増分同期する」を参照してください。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
次の例では、WHEN NOT MATCHED BY SOURCE
句に条件を追加し、一致しないターゲット行で更新する値を指定しています。
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
マージ操作のセマンティクス
merge
のプログラム操作のセマンティクスに関する詳しい説明を次に示します。
whenMatched
句とwhenNotMatched
句は任意の数にできます。whenMatched
句は、一致条件に基づいてソース行がターゲット テーブル行と一致する場合に実行されます。 これらの句のセマンティクスは次のとおりです。whenMatched
句には、最大で 1 つのupdate
とdelete
アクションを含めることができます。merge
のupdate
アクションは、一致するターゲット行の指定された列 (update
操作に類似) のみを更新します。delete
アクションによって、一致した行が削除されます。各
whenMatched
句には、省略可能な条件を指定できます。 この句条件が存在する場合、update
またはdelete
アクションは、句条件が true の場合にのみ、一致するソースとターゲットの行ペアに対して実行されます。whenMatched
句が複数ある場合は、指定された順序で評価されます。 最後のものを除くすべてのwhenMatched
句には条件が必要です。マージ条件に一致するソースとターゲットの行のペアに対して、どの
whenMatched
条件も true と評価されない場合、ターゲット行はそのままです。ソース データセットの対応する列を使用して、ターゲット Delta テーブルのすべての列を更新するには、
whenMatched(...).updateAll()
を使用します。 これは、次の内容と同じです。whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
ターゲット Delta テーブルのすべての列に対して該当します。 したがって、このアクションでは、ソース テーブルの列がターゲット テーブルの列と同じことを前提とします。それ以外の場合、クエリは分析エラーをスローします。
Note
この動作は、スキーマの自動展開が有効になっている場合に変わります。 詳細については、スキーマの自動展開に関する内容を参照してください。
whenNotMatched
clauses are executed when a source row does not match any target row based on the match condition. これらの句のセマンティクスは次のとおりです。whenNotMatched
句にはinsert
アクションのみを含められます。 新しい行は、指定された列とそれに対応する式に基づいて生成されます。 ターゲット テーブル内のすべての列を指定する必要はありません。 指定されていないターゲット列の場合は、NULL
が挿入されます。各
whenNotMatched
句には、省略可能な条件を指定できます。 句の条件が存在する場合、その条件が該当する行に対して true である場合にのみ、ソース行が挿入されます。 それ以外の場合、ソース列は無視されます。whenNotMatched
句が複数ある場合は、指定された順序で評価されます。 最後のものを除くすべてのwhenNotMatched
句には条件が必要です。ソース データセットの対応する列を使用して、ターゲット Delta テーブルのすべての列を挿入するには、
whenNotMatched(...).insertAll()
を使用します。 これは、次の内容と同じです。whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
ターゲット Delta テーブルのすべての列に対して該当します。 したがって、このアクションでは、ソース テーブルの列がターゲット テーブルの列と同じことを前提とします。それ以外の場合、クエリは分析エラーをスローします。
Note
この動作は、スキーマの自動展開が有効になっている場合に変わります。 詳細については、スキーマの自動展開に関する内容を参照してください。
whenNotMatchedBySource
句は、一致条件に基づいてターゲット行がどのソース行とも一致しない場合に実行されます。 これらの句のセマンティクスは次のとおりです。whenNotMatchedBySource
句では、delete
およびupdate
アクションを指定できます。- 各
whenNotMatchedBySource
句には、省略可能な条件を指定できます。 句の条件が存在する場合、その条件が該当する行に対して true である場合にのみ、ターゲット行が変更されます。 それ以外の場合、ターゲット行は変更されません。 whenNotMatchedBySource
句が複数ある場合は、指定された順序で評価されます。 最後のものを除くすべてのwhenNotMatchedBySource
句には条件が必要です。- 定義上、
whenNotMatchedBySource
句には列の値をプルするソース行がないため、ソース列を参照することはできません。 変更する各列について、リテラルを指定するか、SET target.deleted_count = target.deleted_count + 1
など、ターゲット列に対してアクションを実行できます。
重要
- ソース データセットの複数の行が一致し、マージがターゲット Delta テーブルの同じ行を更新しようとすると、
merge
操作が失敗する可能性があります。 マージの SQL セマンティクスによれば、一致するターゲット行を更新するために使用するソース行が明確でないため、このような更新操作はあいまいです。 ソース テーブルを前処理して、複数の一致が発生する可能性をなくすことができます。 - ビューが
CREATE VIEW viewName AS SELECT * FROM deltaTable
として定義されている場合にのみ、SQL VIEW に対してSQLMERGE
操作を適用できます。
Delta テーブルに書き込む場合のデータ重複排除
一般的な ETL の使用例では、ログをテーブルに追加して Delta テーブルに収集します。 しかし、多くの場合、ソースは重複するログ レコードを生成する場合があり、それらを処理するためにダウンストリームの重複排除手順が必要になります。 merge
を使用すると、重複するレコードの挿入を回避できます。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
注意
新しいログを含むデータセットは、それ自体の中で重複を排除する必要があります。 マージの SQL セマンティクスによって、新しいデータをテーブル内の既存のデータと一致させて重複排除が行われますが、新しいデータセット内に重複するデータがある場合はそれが挿入されます。 そのため、テーブルにマージする前に、新しいデータを重複排除します。
数日間だけ重複レコードが取得される可能性がある場合は、日付でテーブルをパーティション分割し、一致させる対象テーブルの日付範囲を指定すると、クエリをさらに最適化できます。
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
これは、テーブル全体ではなく、過去 7 日間のログでのみ重複を検索するため、前のコマンドよりも効率的です。 さらに、この挿入専用マージを構造化ストリーミングと共に使用して、ログの継続的重複排除を実行できます。
- ストリーミング クエリでは、
foreachBatch
のマージ操作を使用して、重複排除を設定して Delta テーブルにストリーミング データを継続的に書き込めます。foreachBatch
の詳細については、次のストリーミングの例を参照してください。 - 別のストリーミング クエリでは、この Delta テーブルから重複排除されたデータを継続的に読み取りできます。 これは、挿入専用のマージでは、Delta テーブルに新しいデータのみが追加されるため可能になります。
Delta Lake を使用して緩やかに変化するデータ (SCD) と変更データ キャプチャ (CDC)
Delta Live Tables では、SCD タイプ 1 およびタイプ 2 の追跡と適用をネイティブにサポートしています。 Delta Live Tables と共に APPLY CHANGES INTO
を使用して、CDC フィードの処理中に順序が正しく処理されるようにします。 APPLY CHANGES API: Delta Live Tables を使用した変更データ キャプチャの簡略化に関する記事を参照してください。
Delta テーブルをソースと増分同期する
Databricks SQL および Databricks Runtime 12.2 LTS 以降では、WHEN NOT MATCHED BY SOURCE
を使用して、テーブルの一部をアトミックに削除したり置き換えたりする任意の条件を作成できます。 これは、最初のデータ入力後に数日間レコードが変更または削除される可能性があるが、最終的に最終状態に落ち着くようなソース テーブルがある場合に特に便利です。
次のクエリは、このパターンを使用して、ソースから 5 日間のレコードを選択し、ターゲットの一致するレコードを更新し、ソースからターゲットに新しいレコードを挿入し、ターゲットの過去 5 日間の一致しないレコードをすべて削除する方法を示しています。
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
ソース テーブルとターゲット テーブルに同じブール値フィルターを指定することで、ソースからターゲット テーブルへ、削除を含む変更を動的に反映できます。
注意
このパターンは条件句なしで使用することもできますが、その場合、ターゲット テーブルを完全に書き換えることになり、コストがかかる可能性があります。