Delta Lake 테이블에 병합을 사용하여 갱신 및 삽입하기
MERGE
SQL 작업을 사용하여 원본 테이블, 뷰 또는 DataFrame의 데이터를 대상 델타 테이블로 업서트할 수 있습니다. Delta Lake는 삽입, 업데이트 및 삭제를 MERGE
지원하며 고급 사용 사례를 용이하게 하기 위해 SQL 표준을 초과하는 확장 구문을 지원합니다.
people10mupdates
라는 이름의 원본 테이블이나 /tmp/delta/people-10m-updates
에 있는 원본 경로에, people10m
라는 이름의 대상 테이블 또는 /tmp/delta/people-10m
에 있는 대상 경로로 향하는 새 데이터가 있다고 가정합니다. 새 레코드 중 일부는 대상 데이터에 이미 있을 수 있습니다. 새 데이터를 병합하려면 사용자의 id
이미 있는 행을 업데이트하고 일치하는 id
없는 새 행을 삽입하려고 합니다. 다음 쿼리를 실행할 수 있습니다.
SQL
MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
UPDATE SET
id = people10mupdates.id,
firstName = people10mupdates.firstName,
middleName = people10mupdates.middleName,
lastName = people10mupdates.lastName,
gender = people10mupdates.gender,
birthDate = people10mupdates.birthDate,
ssn = people10mupdates.ssn,
salary = people10mupdates.salary
WHEN NOT MATCHED
THEN INSERT (
id,
firstName,
middleName,
lastName,
gender,
birthDate,
ssn,
salary
)
VALUES (
people10mupdates.id,
people10mupdates.firstName,
people10mupdates.middleName,
people10mupdates.lastName,
people10mupdates.gender,
people10mupdates.birthDate,
people10mupdates.ssn,
people10mupdates.salary
)
Python
from delta.tables import *
deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople.alias('people') \
.merge(
dfUpdates.alias('updates'),
'people.id = updates.id'
) \
.whenMatchedUpdate(set =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.whenNotMatchedInsert(values =
{
"id": "updates.id",
"firstName": "updates.firstName",
"middleName": "updates.middleName",
"lastName": "updates.lastName",
"gender": "updates.gender",
"birthDate": "updates.birthDate",
"ssn": "updates.ssn",
"salary": "updates.salary"
}
) \
.execute()
Scala
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()
deltaTablePeople
.as("people")
.merge(
dfUpdates.as("updates"),
"people.id = updates.id")
.whenMatched
.updateExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.whenNotMatched
.insertExpr(
Map(
"id" -> "updates.id",
"firstName" -> "updates.firstName",
"middleName" -> "updates.middleName",
"lastName" -> "updates.lastName",
"gender" -> "updates.gender",
"birthDate" -> "updates.birthDate",
"ssn" -> "updates.ssn",
"salary" -> "updates.salary"
))
.execute()
Important
원본 테이블의 한 행만 대상 테이블의 지정된 행과 일치할 수 있습니다. Databricks Runtime 16.0 이상 MERGE
에서 and WHEN MATCHED
절에 ON
지정된 조건을 평가하여 중복 일치 항목을 확인합니다. Databricks Runtime 15.4 LTS 이하 MERGE
에서 작업은 절에 ON
지정된 조건만 고려합니다.
Scala 및 Python의 구문 세부 사항은 Delta Lake API 문서를 참조하세요. SQL 구문 세부 정보는 MERGE INTO 참조하세요.
병합을 사용하여 일치하지 않는 모든 행 수정
Databricks SQL 및 Databricks Runtime 12.2 LTS 이상 버전에서는, 원본 테이블에 해당하는 레코드가 없는 대상 테이블의 레코드를 WHEN NOT MATCHED BY SOURCE
절을 사용하여 UPDATE
또는 DELETE
할 수 있습니다. Databricks는 대상 테이블을 완전히 다시 작성하지 않도록 선택적 조건부 절을 추가하는 것이 좋습니다.
다음 코드 예제에서는 삭제에 이것을 사용하고, 대상 테이블을 원본 테이블의 내용으로 덮어쓰고, 대상 테이블에서 일치하지 않는 레코드를 삭제하는 기본 구문을 보여 줍니다. 시간에 구속된 원본 업데이트 및 삭제가 있는 테이블에 대한 보다 확장 가능한 패턴은 델타 테이블을 원본과 증분 동기화하는 방법을 참조하세요.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.whenNotMatchedBySourceDelete()
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.whenNotMatchedBySource()
.delete()
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
다음 예제에서는 WHEN NOT MATCHED BY SOURCE
절에 조건을 추가하고 일치하지 않는 대상 행에서 업데이트할 값을 지정합니다.
Python
(targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatchedUpdate(
set = {"target.lastSeen": "source.timestamp"}
)
.whenNotMatchedInsert(
values = {
"target.key": "source.key",
"target.lastSeen": "source.timestamp",
"target.status": "'active'"
}
)
.whenNotMatchedBySourceUpdate(
condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
set = {"target.status": "'inactive'"}
)
.execute()
)
Scala
targetDF
.merge(sourceDF, "source.key = target.key")
.whenMatched()
.updateExpr(Map("target.lastSeen" -> "source.timestamp"))
.whenNotMatched()
.insertExpr(Map(
"target.key" -> "source.key",
"target.lastSeen" -> "source.timestamp",
"target.status" -> "'active'",
)
)
.whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
.updateExpr(Map("target.status" -> "'inactive'"))
.execute()
SQL
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
INSERT (key, lastSeen, status) VALUES (source.key, source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
UPDATE SET target.status = 'inactive'
작업 의미 체계 병합
다음은 프로그래밍 방식 작업 의미 체계에 merge
대한 자세한 설명입니다.
whenMatched
및whenNotMatched
절은 원하는 개수만큼 가질 수 있습니다.whenMatched
절은 원본 행이 일치 조건에 따라 대상 테이블 행과 일치할 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.whenMatched
절에는 최대 1개의update
작업과 1개의delete
작업이 포함될 수 있습니다.update
작업은merge
에서 일치하는 대상 행의 지정된 열만 업데이트합니다(이는update
작업와 유사합니다).delete
동작은 일치된 행을 삭제합니다.각
whenMatched
절에 선택적 조건이 포함될 수 있습니다. 이 절 조건이 존재하는 경우, 절 조건이 true인 경우에만 일치하는 원본-대상 행 쌍에 대해update
또는delete
동작이 실행됩니다.여러
whenMatched
절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든whenMatched
절에 조건이 있어야 합니다.whenMatched
조건 중 병합 조건과 일치하는 원본-대상 행 쌍에 대해 true인 조건이 없는 경우, 대상 행이 변경되지 않습니다.대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열로 업데이트하려면
whenMatched(...).updateAll()
사용합니다. 다음 코드와 동일합니다.whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
대상 델타 테이블의 모든 열에 대해 따라서 이 작업은 원본 테이블에 대상 테이블과 같은 열이 있다고 가정하며, 그렇지 않으면 쿼리 분석 오류를 발생시킵니다.
참고 항목
이 동작은 자동 스키마 진화를 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 진화 참조하세요.
whenNotMatched
절은 일치 조건에 따라 원본 행이 대상 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.whenNotMatched
절은 하나의insert
동작만 가질 수 있습니다. 새 행은 지정된 열 및 해당 식을 기반으로 생성됩니다. 대상 테이블의 모든 열을 지정할 필요는 없습니다. 지정되지 않은 대상 열의 경우NULL
삽입됩니다.각
whenNotMatched
절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대한 조건이 true일 때만 원본 행이 삽입됩니다. 그렇지 않으면 원본 열이 무시됩니다.여러
whenNotMatched
절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든whenNotMatched
절에 조건이 있어야 합니다.대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열과 함께 삽입하려면
whenNotMatched(...).insertAll()
사용합니다. 다음 코드와 동일합니다.whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
대상 델타 테이블의 모든 열에 대해 따라서 이 작업은 원본 테이블에 대상 테이블과 동일한 열이 있다고 가정하고, 그렇지 않으면 쿼리에서 분석 오류를 발생시킵니다.
참고 항목
이 동작은 자동 스키마 진화를 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 진화 참조하세요.
whenNotMatchedBySource
절은 대상 행이 병합 조건에 따라 원본 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.-
whenNotMatchedBySource
절은 지정delete
하고update
작업을 수행할 수 있습니다. - 각
whenNotMatchedBySource
절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대해 해당 조건이 true인 경우에만 대상 행이 수정됩니다. 그렇지 않으면 대상 행이 변경되지 않은 상태로 유지됩니다. - 여러
whenNotMatchedBySource
절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든whenNotMatchedBySource
절에 조건이 있어야 합니다. - 정의에 따라
whenNotMatchedBySource
절에는 열 값을 가져올 원본 행이 없으므로 원본 열을 참조할 수 없습니다. 수정할 각 열에 대해 리터럴을 지정하거나 대상 열(예:SET target.deleted_count = target.deleted_count + 1
)에서 작업을 수행할 수 있습니다.
-
Important
- 원본 데이터 세트의 여러 행이 일치하고 병합에서 대상 델타 테이블의 동일한 행을 업데이트하려고 하면
merge
작업이 실패할 수 있습니다. 병합의 SQL 의미 체계에 따르면 일치하는 대상 행을 업데이트하는 데 어떤 원본 행을 사용해야 하는지 불분명하기 때문에 이러한 업데이트 작업은 모호합니다. 원본 테이블을 전처리하여 여러 일치 항목이 발생할 가능성을 제거할 수 있습니다. - 뷰가
MERGE
로 정의된 경우에만 SQL VIEW에 SQLCREATE VIEW viewName AS SELECT * FROM deltaTable
작업을 적용할 수 있습니다.
델타 테이블에 쓸 때 데이터 중복 제거
일반적인 ETL 사용 사례는 로그를 테이블에 추가하여 Delta 테이블로 수집하는 것입니다. 그러나 원본에서 중복 로그 레코드를 생성할 수 있는 경우가 많으며 이를 처리하려면 다운스트림 중복 제거 단계가 필요합니다.
merge
를 사용할 때는 중복된 레코드가 삽입되는 것을 방지할 수 있습니다.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId") \
.whenNotMatchedInsertAll() \
.execute()
Scala
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute()
Java
deltaTable
.as("logs")
.merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId")
.whenNotMatched()
.insertAll()
.execute();
참고 항목
새 로그를 포함하는 데이터 세트는 자체적으로 중복 제거되어야 합니다. 병합의 SQL 의미 체계를 통해 새 데이터를 테이블의 기존 데이터와 일치시키고 중복 제거하지만 새 데이터 세트 내에 중복 데이터가 있는 경우 삽입됩니다. 따라서 테이블에 병합하기 전에 새 데이터를 중복 제거합니다.
며칠 동안만 중복 레코드를 가져올 수 있다는 것을 알고 있는 경우 테이블을 날짜별로 분할한 다음 일치시킬 대상 테이블의 날짜 범위를 지정하여 쿼리를 추가로 최적화할 수 있습니다.
SQL
MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
THEN INSERT *
Python
deltaTable.alias("logs").merge(
newDedupedLogs.alias("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
.whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
.execute()
Scala
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute()
Java
deltaTable.as("logs").merge(
newDedupedLogs.as("newDedupedLogs"),
"logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
.whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
.insertAll()
.execute();
이는 전체 테이블이 아닌 지난 7일 동안의 로그에서만 중복 항목을 찾으므로 이전 명령보다 더 효율적입니다. 또한 구조적 스트리밍과 이 삽입 전용 병합을 사용하여 로그의 연속 중복 제거를 수행할 수 있습니다.
- 스트리밍 쿼리에서는
foreachBatch
병합 작업을 사용하여 중복 제거를 사용하여 모든 스트리밍 데이터를 델타 테이블에 지속적으로 쓸 수 있습니다. 에 대한 자세한 내용은 아래의foreachBatch
를 참조하세요. - 다른 스트리밍 쿼리에서는 이 델타 테이블에서 중복 제거된 데이터를 지속적으로 읽을 수 있습니다. 삽입 전용 병합은 델타 테이블에 새 데이터만 추가하기 때문에 가능합니다.
Delta Lake를 사용하여 SCD(느린 변경 데이터) 및 CDC(변경 데이터 캡처)
Delta Live Tables는 SCD Type 1 및 Type 2를 추적하고 적용하기 위한 기본 지원을 제공합니다. 델타 라이브 테이블에서 APPLY CHANGES INTO
사용하여 CDC 피드를 처리할 때 잘못된 레코드가 올바르게 처리되도록 합니다.
변경 사항 적용 API: Delta Live Tables을 사용하여 변경 데이터 캡처를 간소화하는 방법을 참조하세요.
델타 테이블을 원본과 증분 방식으로 동기화
Databricks SQL 및 Databricks Runtime 12.2 LTS 이상에서는 WHEN NOT MATCHED BY SOURCE
사용하여 테이블의 일부를 원자성으로 삭제하고 바꾸는 임의의 조건을 만들 수 있습니다. 이는 초기 데이터 입력 후 며칠 동안 레코드가 변경되거나 삭제될 수 있지만 최종 상태로 정착되는 원본 테이블이 있는 경우에 특히 유용할 수 있습니다.
다음 쿼리에서는 이 패턴을 사용하여 원본에서 5일간의 레코드를 선택하고, 대상에서 일치하는 레코드를 업데이트하고, 원본에서 대상으로 새 레코드를 삽입하고, 대상에서 지난 5일 동안 일치하지 않는 모든 레코드를 삭제하는 방법을 보여 줍니다.
MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE
원본 및 대상 테이블에 동일한 부울 필터를 제공하면 삭제를 포함하여 원본에서 대상 테이블로 변경 내용을 동적으로 전파할 수 있습니다.
참고 항목
이 패턴은 조건부 절 없이 사용할 수 있지만 이로 인해 비용이 많이 들 수 있는 대상 테이블을 완전히 다시 작성하게 됩니다.