Delta Lake를 사용하여 선택적으로 데이터 덮어쓰기
Azure Databricks는 Delta Lake 기능을 활용하여 선택적 덮어쓰기를 위한 두 가지 고유한 옵션을 지원합니다.
replaceWhere
옵션은 지정된 조건자와 일치하는 모든 레코드를 원자성으로 바꿉니다.- 동적 파티션 덮어쓰기를 사용하여 테이블을 분할하는 방법에 따라 데이터 디렉터리를 바꿀 수 있습니다.
대부분의 작업에서 Databricks는 덮어쓸 데이터를 지정하는 데 사용하는 replaceWhere
것이 좋습니다.
Important
데이터가 실수로 덮어쓰여진 경우 복원을 사용하여 변경 내용을 실행 취소할 수 있습니다.
replaceWhere
를 사용하여 임의 선택적 덮어쓰기
임의의 식과 일치하는 데이터만 선택적으로 덮어쓸 수 있습니다.
참고 항목
SQL에는 Databricks Runtime 12.2 LTS 이상이 필요합니다.
다음 명령은 start_date
로 분할된 대상 테이블의 1월 이벤트를 replace_data
의 원자 단위 데이터로 바꿉니다.
Python
(replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
)
Scala
replace_data.write
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.table("events")
SQL
INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
이 샘플 코드는 replace_data
데이터를 작성하고, 모든 행이 조건자와 일치하는지 확인하고, 의미 체계를 사용하여 overwrite
원자성 대체를 수행합니다. 작업의 값이 제약 조건을 벗어나면 기본적으로 오류가 발생하여 이 작업이 실패합니다.
이 동작을 overwrite
조건자 범위 내의 값과 insert
지정된 범위를 벗어난 레코드로 변경할 수 있습니다. 이렇게 하려면 다음 설정 중 하나를 사용하여 false로 설정 spark.databricks.delta.replaceWhere.constraintCheck.enabled
하여 제약 조건 검사를 사용하지 않도록 설정합니다.
Python
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
레거시 동작
레거시 기본 동작은 replaceWhere
파티션 열에만 조건자 일치 데이터를 덮어씁니다. 이 레거시 모델을 사용하면 다음 명령이 대상 테이블의 1월 월을 원자성으로 대체합니다. 이 월은 다음의 데이터df
로 분할됩니다date
.
Python
(df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
)
Scala
df.write
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.table("people10m")
이전 동작으로 대체하려는 경우 플래그를 spark.databricks.delta.replaceWhere.dataColumns.enabled
사용하지 않도록 설정할 수 있습니다.
Python
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
Scala
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SQL
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
동적 파티션 덮어쓰기
Important
이 기능은 공개 미리 보기 상태입니다.
Databricks Runtime 11.3 LTS 이상은 분할된 테이블에 대한 동적 파티션 덮어쓰기 모드를 지원합니다. 여러 파티션이 있는 테이블의 경우 Databricks Runtime 11.3 LTS 이하에서는 모든 파티션 열이 동일한 데이터 형식인 경우에만 동적 파티션 덮어쓰기를 지원합니다.
동적 파티션 덮어쓰기 모드에서 작업은 쓰기가 새 데이터를 커밋하는 각 논리 파티션의 모든 기존 데이터를 덮어씁니다. 쓰기에 데이터가 포함되지 않은 기존 논리 파티션은 변경되지 않은 상태로 유지됩니다. 이 모드는 데이터가 덮어쓰기 모드로 작성되는 경우, 즉 SQL에서 INSERT OVERWRITE
를 사용하거나 df.write.mode("overwrite")
에서 DataFrame을 사용하여 작성되는 경우에만 적용됩니다.
Spark 세션 구성 spark.sql.sources.partitionOverwriteMode
를 dynamic
으로 설정하여 동적 파티션 덮어쓰기 모드를 구성합니다. DataFrameWriter
옵션 partitionOverwriteMode
를 dynamic
으로 설정하여 이 옵션을 사용하도록 설정할 수도 있습니다. 있는 경우 쿼리별 옵션은 세션 구성에 정의된 모드를 재정의합니다. partitionOverwriteMode
에 대한 기본값은 static
입니다.
Important
동적 파티션 덮어쓰기를 사용하여 작성된 데이터가 예상되는 파티션에만 영향을 주는지 확인합니다. 잘못된 파티션의 단일 행은 의도치 않게 전체 파티션을 덮어쓸 수 있습니다.
다음 예제에서는 동적 파티션 덮어쓰기를 사용하는 방법을 보여 줍니다.
SQL
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
Python
(df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
)
Scala
df.write
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
참고 항목
- 동적 파티션 덮어쓰기가 분할된 테이블에 대한 옵션
replaceWhere
와 충돌합니다.- Spark 세션 구성에서 동적 파티션 덮어쓰기를 사용하도록 설정하고
replaceWhere
가DataFrameWriter
옵션으로 제공된 경우 Delta Lake는replaceWhere
식에 따라 데이터를 덮어씁니다(쿼리별 옵션은 세션 구성을 재정의함). - 옵션에
DataFrameWriter
동적 파티션 덮어쓰기 및replaceWhere
사용 설정이 모두 있는 경우 오류가 발생합니다.
- Spark 세션 구성에서 동적 파티션 덮어쓰기를 사용하도록 설정하고
- 동적 파티션 덮어쓰기를 사용할 때는
true
지정할overwriteSchema
수 없습니다.