다음을 통해 공유


Delta Lake 테이블에 병합을 사용하여 갱신 및 삽입하기

MERGE SQL 작업을 사용하여 원본 테이블, 뷰 또는 DataFrame의 데이터를 대상 델타 테이블로 업서트할 수 있습니다. Delta Lake는 삽입, 업데이트 및 삭제를 MERGE지원하며 고급 사용 사례를 용이하게 하기 위해 SQL 표준을 초과하는 확장 구문을 지원합니다.

people10mupdates라는 이름의 원본 테이블이나 /tmp/delta/people-10m-updates에 있는 원본 경로에, people10m라는 이름의 대상 테이블 또는 /tmp/delta/people-10m에 있는 대상 경로로 향하는 새 데이터가 있다고 가정합니다. 새 레코드 중 일부는 대상 데이터에 이미 있을 수 있습니다. 새 데이터를 병합하려면 사용자의 id 이미 있는 행을 업데이트하고 일치하는 id 없는 새 행을 삽입하려고 합니다. 다음 쿼리를 실행할 수 있습니다.

SQL

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

Python

from delta.tables import *

deltaTablePeople = DeltaTable.forName(spark, "people10m")
deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()

Scala

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forName(spark, "people10m")
val deltaTablePeopleUpdates = DeltaTable.forName(spark, "people10mupdates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()

Important

원본 테이블의 한 행만 대상 테이블의 지정된 행과 일치할 수 있습니다. Databricks Runtime 16.0 이상 MERGE 에서 and WHEN MATCHED 절에 ON 지정된 조건을 평가하여 중복 일치 항목을 확인합니다. Databricks Runtime 15.4 LTS 이하 MERGE 에서 작업은 절에 ON 지정된 조건만 고려합니다.

Scala 및 Python의 구문 세부 사항은 Delta Lake API 문서를 참조하세요. SQL 구문 세부 정보는 MERGE INTO 참조하세요.

병합을 사용하여 일치하지 않는 모든 행 수정

Databricks SQL 및 Databricks Runtime 12.2 LTS 이상 버전에서는, 원본 테이블에 해당하는 레코드가 없는 대상 테이블의 레코드를 WHEN NOT MATCHED BY SOURCE 절을 사용하여 UPDATE 또는 DELETE 할 수 있습니다. Databricks는 대상 테이블을 완전히 다시 작성하지 않도록 선택적 조건부 절을 추가하는 것이 좋습니다.

다음 코드 예제에서는 삭제에 이것을 사용하고, 대상 테이블을 원본 테이블의 내용으로 덮어쓰고, 대상 테이블에서 일치하지 않는 레코드를 삭제하는 기본 구문을 보여 줍니다. 시간에 구속된 원본 업데이트 및 삭제가 있는 테이블에 대한 보다 확장 가능한 패턴은 델타 테이블을 원본과 증분 동기화하는 방법을 참조하세요.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

다음 예제에서는 WHEN NOT MATCHED BY SOURCE 절에 조건을 추가하고 일치하지 않는 대상 행에서 업데이트할 값을 지정합니다.

Python

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)

Scala

targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()

SQL

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

작업 의미 체계 병합

다음은 프로그래밍 방식 작업 의미 체계에 merge 대한 자세한 설명입니다.

  • whenMatchedwhenNotMatched 절은 원하는 개수만큼 가질 수 있습니다.

  • whenMatched 절은 원본 행이 일치 조건에 따라 대상 테이블 행과 일치할 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.

    • whenMatched 절에는 최대 1개의 update 작업과 1개의 delete 작업이 포함될 수 있습니다. update 작업은 merge에서 일치하는 대상 행의 지정된 열만 업데이트합니다(이는 update작업와 유사합니다). delete 동작은 일치된 행을 삭제합니다.

    • whenMatched 절에 선택적 조건이 포함될 수 있습니다. 이 절 조건이 존재하는 경우, 절 조건이 true인 경우에만 일치하는 원본-대상 행 쌍에 대해 update 또는 delete 동작이 실행됩니다.

    • 여러 whenMatched 절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든 whenMatched 절에 조건이 있어야 합니다.

    • whenMatched 조건 중 병합 조건과 일치하는 원본-대상 행 쌍에 대해 true인 조건이 없는 경우, 대상 행이 변경되지 않습니다.

    • 대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열로 업데이트하려면 whenMatched(...).updateAll()사용합니다. 다음 코드와 동일합니다.

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      대상 델타 테이블의 모든 열에 대해 따라서 이 작업은 원본 테이블에 대상 테이블과 같은 열이 있다고 가정하며, 그렇지 않으면 쿼리 분석 오류를 발생시킵니다.

      참고 항목

      이 동작은 자동 스키마 진화를 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 진화 참조하세요.

  • whenNotMatched 절은 일치 조건에 따라 원본 행이 대상 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.

    • whenNotMatched 절은 하나의 insert 동작만 가질 수 있습니다. 새 행은 지정된 열 및 해당 식을 기반으로 생성됩니다. 대상 테이블의 모든 열을 지정할 필요는 없습니다. 지정되지 않은 대상 열의 경우 NULL 삽입됩니다.

    • whenNotMatched 절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대한 조건이 true일 때만 원본 행이 삽입됩니다. 그렇지 않으면 원본 열이 무시됩니다.

    • 여러 whenNotMatched 절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든 whenNotMatched 절에 조건이 있어야 합니다.

    • 대상 델타 테이블의 모든 열을 원본 데이터 세트의 해당 열과 함께 삽입하려면 whenNotMatched(...).insertAll()사용합니다. 다음 코드와 동일합니다.

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      대상 델타 테이블의 모든 열에 대해 따라서 이 작업은 원본 테이블에 대상 테이블과 동일한 열이 있다고 가정하고, 그렇지 않으면 쿼리에서 분석 오류를 발생시킵니다.

      참고 항목

      이 동작은 자동 스키마 진화를 사용하도록 설정하면 변경됩니다. 자세한 내용은 자동 스키마 진화 참조하세요.

  • whenNotMatchedBySource 절은 대상 행이 병합 조건에 따라 원본 행과 일치하지 않을 때 실행됩니다. 해당 절에는 다음과 같은 의미 체계가 있습니다.

    • whenNotMatchedBySource 절은 지정 delete 하고 update 작업을 수행할 수 있습니다.
    • whenNotMatchedBySource 절에 선택적 조건이 포함될 수 있습니다. 절 조건이 있는 경우 해당 행에 대해 해당 조건이 true인 경우에만 대상 행이 수정됩니다. 그렇지 않으면 대상 행이 변경되지 않은 상태로 유지됩니다.
    • 여러 whenNotMatchedBySource 절이 있는 경우 지정된 순서대로 평가됩니다. 마지막 절을 제외한 모든 whenNotMatchedBySource 절에 조건이 있어야 합니다.
    • 정의에 따라 whenNotMatchedBySource 절에는 열 값을 가져올 원본 행이 없으므로 원본 열을 참조할 수 없습니다. 수정할 각 열에 대해 리터럴을 지정하거나 대상 열(예: SET target.deleted_count = target.deleted_count + 1)에서 작업을 수행할 수 있습니다.

Important

  • 원본 데이터 세트의 여러 행이 일치하고 병합에서 대상 델타 테이블의 동일한 행을 업데이트하려고 하면 merge 작업이 실패할 수 있습니다. 병합의 SQL 의미 체계에 따르면 일치하는 대상 행을 업데이트하는 데 어떤 원본 행을 사용해야 하는지 불분명하기 때문에 이러한 업데이트 작업은 모호합니다. 원본 테이블을 전처리하여 여러 일치 항목이 발생할 가능성을 제거할 수 있습니다.
  • 뷰가 MERGE로 정의된 경우에만 SQL VIEW에 SQL CREATE VIEW viewName AS SELECT * FROM deltaTable 작업을 적용할 수 있습니다.

델타 테이블에 쓸 때 데이터 중복 제거

일반적인 ETL 사용 사례는 로그를 테이블에 추가하여 Delta 테이블로 수집하는 것입니다. 그러나 원본에서 중복 로그 레코드를 생성할 수 있는 경우가 많으며 이를 처리하려면 다운스트림 중복 제거 단계가 필요합니다. merge를 사용할 때는 중복된 레코드가 삽입되는 것을 방지할 수 있습니다.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()

Scala

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()

Java

deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

참고 항목

새 로그를 포함하는 데이터 세트는 자체적으로 중복 제거되어야 합니다. 병합의 SQL 의미 체계를 통해 새 데이터를 테이블의 기존 데이터와 일치시키고 중복 제거하지만 새 데이터 세트 내에 중복 데이터가 있는 경우 삽입됩니다. 따라서 테이블에 병합하기 전에 새 데이터를 중복 제거합니다.

며칠 동안만 중복 레코드를 가져올 수 있다는 것을 알고 있는 경우 테이블을 날짜별로 분할한 다음 일치시킬 대상 테이블의 날짜 범위를 지정하여 쿼리를 추가로 최적화할 수 있습니다.

SQL

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *

Python

deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()

Scala

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()

Java

deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

이는 전체 테이블이 아닌 지난 7일 동안의 로그에서만 중복 항목을 찾으므로 이전 명령보다 더 효율적입니다. 또한 구조적 스트리밍과 이 삽입 전용 병합을 사용하여 로그의 연속 중복 제거를 수행할 수 있습니다.

  • 스트리밍 쿼리에서는 foreachBatch 병합 작업을 사용하여 중복 제거를 사용하여 모든 스트리밍 데이터를 델타 테이블에 지속적으로 쓸 수 있습니다. 에 대한 자세한 내용은 아래의 foreachBatch를 참조하세요.
  • 다른 스트리밍 쿼리에서는 이 델타 테이블에서 중복 제거된 데이터를 지속적으로 읽을 수 있습니다. 삽입 전용 병합은 델타 테이블에 새 데이터만 추가하기 때문에 가능합니다.

Delta Lake를 사용하여 SCD(느린 변경 데이터) 및 CDC(변경 데이터 캡처)

Delta Live Tables는 SCD Type 1 및 Type 2를 추적하고 적용하기 위한 기본 지원을 제공합니다. 델타 라이브 테이블에서 APPLY CHANGES INTO 사용하여 CDC 피드를 처리할 때 잘못된 레코드가 올바르게 처리되도록 합니다. 변경 사항 적용 API: Delta Live Tables을 사용하여 변경 데이터 캡처를 간소화하는 방법을 참조하세요.

델타 테이블을 원본과 증분 방식으로 동기화

Databricks SQL 및 Databricks Runtime 12.2 LTS 이상에서는 WHEN NOT MATCHED BY SOURCE 사용하여 테이블의 일부를 원자성으로 삭제하고 바꾸는 임의의 조건을 만들 수 있습니다. 이는 초기 데이터 입력 후 며칠 동안 레코드가 변경되거나 삭제될 수 있지만 최종 상태로 정착되는 원본 테이블이 있는 경우에 특히 유용할 수 있습니다.

다음 쿼리에서는 이 패턴을 사용하여 원본에서 5일간의 레코드를 선택하고, 대상에서 일치하는 레코드를 업데이트하고, 원본에서 대상으로 새 레코드를 삽입하고, 대상에서 지난 5일 동안 일치하지 않는 모든 레코드를 삭제하는 방법을 보여 줍니다.

MERGE INTO target AS t
USING (SELECT * FROM source WHERE created_at >= (current_date() - INTERVAL '5' DAY)) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
WHEN NOT MATCHED BY SOURCE AND created_at >= (current_date() - INTERVAL '5' DAY) THEN DELETE

원본 및 대상 테이블에 동일한 부울 필터를 제공하면 삭제를 포함하여 원본에서 대상 테이블로 변경 내용을 동적으로 전파할 수 있습니다.

참고 항목

이 패턴은 조건부 절 없이 사용할 수 있지만 이로 인해 비용이 많이 들 수 있는 대상 테이블을 완전히 다시 작성하게 됩니다.