Delta Lake 테이블 기록 작업
Delta Lake 테이블을 수정하는 각 작업에 따라 새 테이블 버전이 만들어집니다. 기록 정보를 사용하여 작업 감사, 테이블 롤백 또는 시간 여행을 사용하여 특정 시점에 테이블을 쿼리할 수 있습니다.
참고 항목
Databricks는 데이터 보관을 위한 장기 백업 솔루션으로 Delta Lake 테이블 기록을 사용하는 것을 권장하지 않습니다. Databricks는 데이터 및 로그 보존 구성을 모두 더 큰 값으로 설정하지 않는 한 시간 이동 작업에 지난 7일만 사용할 것을 권장합니다.
델타 테이블 기록 검색
명령을 실행 history
하여 Delta 테이블에 대한 각 쓰기에 대한 작업, 사용자 및 타임스탬프를 포함한 정보를 검색할 수 있습니다. 연산은 역순으로 반환됩니다.
테이블 기록 보존은 기본적으로 30일인 테이블 설정 delta.logRetentionDuration
에 의해 결정됩니다.
참고 항목
시간 이동 및 테이블 기록은 서로 다른 보존 임계값에 의해 제어됩니다. Delta Lake 시간 이동이란?을 참조하세요.
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
Spark SQL 구문 세부 정보는 DESCRIBE HISTORY를 참조하세요.
Scala/Java/Python 구문에 대한 자세한 내용은 Delta Lake API 설명서를 참조하세요.
카탈로그 탐색기는 델타 테이블에 대한 이 자세한 테이블 정보 및 기록을 시각적으로 볼 수 있습니다. 테이블 스키마 및 샘플 데이터 외에도 기록 탭을 클릭하여 DESCRIBE HISTORY
를 통해 표시하는 테이블 기록을 볼 수 있습니다.
기록 스키마
history
작업의 출력은 다음과 같은 열을 갖습니다.
Column | Type | 설명 |
---|---|---|
version | long | 작업에 의해 생성된 테이블 버전. |
timestamp | timestamp | 이 버전이 커밋된 시점. |
userId | string | 작업을 실행한 사용자의 ID. |
userName | string | 작업을 실행한 사용자의 이름. |
operation | string | 작업의 이름입니다. |
operationParameters | map | 작업의 매개 변수(예: 조건자). |
작업(job) | struct | 작업을 실행한 작업의 세부 정보. |
Notebook | struct | 작업이 실행된 Notebook의 세부 정보. |
clusterId | string | 작업이 실행된 클러스터의 ID. |
readVersion | long | 쓰기 작업을 수행하기 위해 읽은 테이블의 버전. |
isolationLevel | string | 이 작업에 사용된 격리 수준. |
isBlindAppend | 부울 값 | 이 작업에 데이터가 추가되었는지 여부. |
operationMetrics | map | 작업의 메트릭(예: 수정된 행 및 파일의 수). |
userMetadata | string | 지정된 경우, 사용자 정의 커밋 메타데이터 |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
참고 항목
- 다음 메서드를 사용하여 Delta 테이블에 작성하는 경우 다른 열 중 일부를 사용할 수 없습니다.
- 추후 추가되는 열은 항상 마지막 열 뒤에 추가됩니다.
작업 메트릭 키
history
작업은 operationMetrics
열 맵으로 작업 메트릭 모음을 반환합니다.
다음 표에는 작업별 맵 키 정의가 나와 있습니다.
연산 | 메트릭 이름 | 설명 |
---|---|---|
WRITE, CREATE TABLE AS SELECT, REPLACE TABLE AS SELECT, COPY INTO | ||
numFiles | 쓰기된 파일의 수. | |
numOutputBytes | 쓰기된 콘텐츠의 크기(바이트). | |
numOutputRows | 쓰기된 행의 수. | |
STREAMING UPDATE | ||
numAddedFiles | 추가된 파일의 수. | |
numRemovedFiles | 제거된 파일의 수. | |
numOutputRows | 쓰기된 행의 수. | |
numOutputBytes | 쓰기의 크기(바이트). | |
DELETE | ||
numAddedFiles | 추가된 파일의 수. 테이블의 파티션이 삭제된 경우에는 제공되지 않음. | |
numRemovedFiles | 제거된 파일의 수. | |
numDeletedRows | 제거된 행의 수. 테이블의 파티션이 삭제된 경우에는 제공되지 않음. | |
numCopiedRows | 파일을 삭제하는 과정에서 복사된 행의 수. | |
executionTimeMs | 작업 전체를 실행하는 데 걸린 시간. | |
scanTimeMs | 파일에서 일치하는 항목을 검색하는 데 걸린 시간. | |
rewriteTimeMs | 일치하는 파일을 다시 쓰는 데 걸린 시간. | |
TRUNCATE | ||
numRemovedFiles | 제거된 파일의 수. | |
executionTimeMs | 작업 전체를 실행하는 데 걸린 시간. | |
MERGE | ||
numSourceRows | 원본 DataFrame의 행 수. | |
numTargetRowsInserted | 대상 테이블에 삽입된 행 수. | |
numTargetRowsUpdated | 대상 테이블에서 업데이트된 행 수. | |
numTargetRowsDeleted | 대상 테이블에서 삭제된 행 수. | |
numTargetRowsCopied | 복사된 대상 행의 수. | |
numOutputRows | 출력으로 쓰기된 행의 총 수. | |
numTargetFilesAdded | 싱크(대상)에 추가된 파일의 수. | |
numTargetFilesRemoved | 싱크(대상)에서 제거된 파일의 수. | |
executionTimeMs | 작업 전체를 실행하는 데 걸린 시간. | |
scanTimeMs | 파일에서 일치하는 항목을 검색하는 데 걸린 시간. | |
rewriteTimeMs | 일치하는 파일을 다시 쓰는 데 걸린 시간. | |
UPDATE | ||
numAddedFiles | 추가된 파일의 수. | |
numRemovedFiles | 제거된 파일의 수. | |
numUpdatedRows | 업데이트된 행의 수. | |
numCopiedRows | 파일을 업데이트하는 과정에서 방금 복사된 행의 수. | |
executionTimeMs | 작업 전체를 실행하는 데 걸린 시간. | |
scanTimeMs | 파일에서 일치하는 항목을 검색하는 데 걸린 시간. | |
rewriteTimeMs | 일치하는 파일을 다시 쓰는 데 걸린 시간. | |
FSCK | numRemovedFiles | 제거된 파일의 수. |
CONVERT | numConvertedFiles | 변환된 Parquet 파일의 수. |
OPTIMIZE | ||
numAddedFiles | 추가된 파일의 수. | |
numRemovedFiles | 최적화된 파일의 수. | |
numAddedBytes | 테이블이 최적화된 후 추가된 바이트 수. | |
numRemovedBytes | 제거된 바이트 수. | |
minFileSize | 테이블이 최적화된 후 가장 작은 파일의 크기. | |
p25FileSize | 테이블이 최적화된 후 25번째 백분위수 파일의 크기. | |
p50FileSize | 테이블이 최적화된 후의 파일 크기의 중앙값. | |
p75FileSize | 테이블이 최적화된 후 75번째 백분위수 파일의 크기. | |
maxFileSize | 테이블이 최적화된 후 가장 큰 파일의 크기. | |
클론 | ||
sourceTableSize | 복제된 버전에서 원본 테이블의 크기(바이트). | |
sourceNumOfFiles | 복제된 버전에서 원본 테이블의 파일 수. | |
numRemovedFiles | 이전 델타 테이블을 바꾼 경우 대상 테이블에서 제거된 파일의 수. | |
removedFilesSize | 이전 델타 테이블을 바꾼 경우 대상 테이블에서 제거된 파일의 총 크기(바이트). | |
numCopiedFiles | 새 위치로 복사된 파일의 수. 얕은 복제의 경우 0. | |
copiedFilesSize | 새 위치로 복사된 파일의 총 크기(바이트). 얕은 복제의 경우 0. | |
RESTORE | ||
tableSizeAfterRestore | 복원 후 테이블 크기(바이트). | |
numOfFilesAfterRestore | 복원 후 테이블의 파일 수. | |
numRemovedFiles | 복원 작업으로 제거된 파일의 수. | |
numRestoredFiles | 복원의 결과로 추가된 파일의 수. | |
removedFilesSize | 복원에 의해 제거된 파일의 크기(바이트). | |
restoredFilesSize | 복원에 의해 추가된 파일의 크기(바이트). | |
VACUUM | ||
numDeletedFiles | 삭제된 파일의 수. | |
numVacuumedDirectories | vacuum된 디렉터리의 수. | |
numFilesToDelete | 삭제할 파일의 수. |
Delta Lake 시간 이동이란?
Delta Lake 시간 이동은 타임스탬프 또는 테이블 버전(트랜잭션 로그에 기록된 대로)에 따라 이전 테이블 버전 쿼리를 지원합니다. 다음과 같은 애플리케이션에 시간 이동 기능을 사용할 수 있습니다.
- 분석, 보고서 또는 출력(예: 기계 학습 모델의 출력)을 다시 생성합니다. 특히 규제 산업에서 디버깅 또는 감사에 유용할 수 있습니다.
- 복잡한 임시 쿼리를 작성합니다.
- 데이터의 실수를 수정합니다.
- 빠르게 변화하는 테이블을 위한 일련의 쿼리에 스냅샷 격리를 제공합니다.
Important
시간 이동으로 액세스할 수 있는 테이블 버전은 트랜잭션 로그 파일의 보존 임계값과 작업의 빈도 및 지정된 보존의 조합에 VACUUM
따라 결정됩니다. 기본값을 사용하여 매일 실행하는 VACUUM
경우 시간 이동에 7일의 데이터를 사용할 수 있습니다.
Delta 시간 이동 구문
테이블 이름 사양 다음에 절을 추가하여 시간 이동이 있는 델타 테이블을 쿼리합니다.
timestamp_expression
는 다음 중 하나일 수 있습니다.'2018-10-18T22:15:12.013Z'
, 즉, 타임스탬프로 캐스팅할 수 있는 문자열cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
, 즉, 날짜 문자열current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- 타임스탬프이거나 타임스탬프로 캐스팅될 수 있는 기타 식
version
은DESCRIBE HISTORY table_spec
출력에서 가져올 수 있는 긴 값입니다.
timestamp_expression
및 version
는 모두 하위 쿼리가 될 수 없습니다.
날짜 또는 타임스탬프 문자열만 허용됩니다. 예를 들어 "2019-01-01"
및 "2019-01-01T00:00:00.000Z"
를 지정합니다. 다음 코드에서 예제 구문 참조:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
@
구문을 사용하여 테이블 이름의 일부로 타임스탬프 또는 버전을 지정할 수도 있습니다. 타임스탬프는 yyyyMMddHHmmssSSS
형식이어야 합니다. 버전 앞에 v
를 추가하여 @
뒤에 버전을 지정할 수 있습니다. 다음 코드에서 예제 구문 참조:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
트랜잭션 로그 검사점이란?
Delta Lake는 테이블 데이터와 함께 저장되는 _delta_log
디렉터리 내에서 테이블 버전을 JSON 파일로 기록합니다. 검사점 쿼리를 최적화하기 위해 Delta Lake는 테이블 버전을 Parquet 검사점 파일에 집계하므로 테이블 기록의 모든 JSON 버전을 읽을 필요가 없습니다. Azure Databricks는 데이터 크기 및 워크로드에 대한 검사점 빈도를 최적화합니다. 사용자는 검사점과 직접 상호 작용할 필요가 없습니다. 검사점 빈도는 예고 없이 변경될 수 있습니다.
시간 이동 쿼리에 대한 데이터 보존 구성
이전 테이블 버전을 쿼리하려면 해당 버전의 로그와 데이터 파일을 모두 유지해야 합니다.
테이블에 대해 실행하면 VACUUM
데이터 파일이 삭제됩니다. Delta Lake는 테이블 버전을 검사한 후 로그 파일 제거를 자동으로 관리합니다.
대부분의 델타 테이블은 정기적으로 실행되므로 VACUUM
지정 시간 쿼리는 기본적으로 7일인 보존 임계값을 VACUUM
준수해야 합니다.
델타 테이블에 대한 데이터 보존 임계값을 늘리려면 다음 테이블 속성을 구성해야 합니다.
delta.logRetentionDuration = "interval <interval>"
: 테이블에 대한 기록이 유지되는 기간을 제어합니다. 기본값은interval 30 days
입니다.delta.deletedFileRetentionDuration = "interval <interval>"
: 현재 테이블 버전에서 더 이상 참조되지 않는 데이터 파일을 제거하는 데 사용되는 임계값VACUUM
을 결정합니다. 기본값은interval 7 days
입니다.
테이블을 만드는 동안 델타 속성을 지정하거나 문을 사용하여 ALTER TABLE
설정할 수 있습니다. Delta 테이블 속성 참조를 참조하세요.
참고 항목
자주 VACUUM
작업하는 테이블에 대해 테이블 기록이 더 긴 기간 동안 유지되도록 두 속성을 모두 설정해야 합니다. 예를 들어 30일의 기록 데이터에 액세스하려면 기본 설정delta.logRetentionDuration
과 일치하도록 설정합니다 delta.deletedFileRetentionDuration = "interval 30 days"
.
데이터 보존 임계값을 늘리면 더 많은 데이터 파일이 유지 관리됨에 따라 스토리지 비용이 증가할 수 있습니다.
델타 테이블을 이전 상태로 복원
RESTORE
명령을 사용하여 델타 테이블을 이전 상태로 복원할 수 있습니다. 델타 테이블은 내부적으로 테이블의 이전 버전을 유지 관리하므로 이전 상태로 복원할 수 있습니다.
이전 상태에 해당하는 버전 또는 이전 상태가 만들어진 시점의 타임스탬프가 RESTORE
명령에 의해 옵션으로 지원됩니다.
Important
- 이미 복원된 테이블을 복원할 수 있습니다.
- 복제된 테이블을 복원할 수 있습니다.
- 복원할 테이블에 대한
MODIFY
권한이 필요합니다. - 데이터 파일이 수동으로 또는 에 의해
vacuum
삭제된 이전 버전으로 테이블을 복원할 수 없습니다.spark.sql.files.ignoreMissingFiles
가true
로 설정된 경우에는 이 버전으로 부분적으로 복원하는 것이 가능합니다. - 이전 상태로 복원하기 위한 타임스탬프 형식은
yyyy-MM-dd HH:mm:ss
입니다. 날짜(yyyy-MM-dd
) 문자열만 제공하는 것도 지원됩니다.
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
자세한 내용은 RESTORE를 참조하세요.
Important
복원은 데이터 변경 작업으로 간주됩니다. RESTORE
명령에 의해 추가된 Delta Lake 로그 항목에는 true로 설정된 dataChange가 포함됩니다. Delta Lake 테이블에 대한 업데이트를 처리하는 구조적 스트리밍 작업과 같은 다운스트림 애플리케이션이 있는 경우 복원 작업에서 추가한 데이터 변경 로그 항목은 새 데이터 업데이트로 간주되며 이를 처리하면 데이터가 중복될 수 있습니다.
예시:
테이블 버전 | 연산 | 델타 로그 업데이트 | 데이터 변경 로그 업데이트의 레코드 |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (최적화 압축으로 레코드가 없는 경우 테이블의 데이터가 변경되지 않음) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
앞의 예제에서 RESTORE
명령은 델타 테이블 버전 0 및 1을 읽을 때 이미 표시된 업데이트를 생성합니다. 스트리밍 쿼리가 이 테이블을 읽는 경우 이러한 파일은 새로 추가된 데이터로 간주되어 다시 처리됩니다.
메트릭 복원
RESTORE
는 작업이 완료되면 다음과 같은 메트릭을 단일 행 DataFrame으로 보고합니다.
table_size_after_restore
: 복원 후 테이블의 크기.num_of_files_after_restore
: 복원 후 테이블의 파일 수.num_removed_files
: 테이블에서 제거된(논리적으로 삭제된) 파일의 수.num_restored_files
: 롤백으로 인해 복원된 파일의 수.removed_files_size
: 테이블에서 제거된 파일의 총 크기(바이트).restored_files_size
: 복원된 파일의 총 크기(바이트).
Delta Lake 시간 이동 사용 예제
실수로 인한 사용자(
111
)에 대한 테이블 삭제를 수정합니다.INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
실수로 인한 테이블에 대한 잘못된 업데이트를 수정합니다.
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
지난 주에 추가된 신규 고객 수를 쿼리합니다.
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
Spark 세션에서 마지막 커밋의 버전을 어떻게 찾습니까?
모든 스레드와 모든 테이블에서 현재 SparkSession
에 의해 써진 마지막 커밋의 버전 번호를 가져오려면 SQL 구성 spark.databricks.delta.lastCommitVersionInSession
을 쿼리합니다.
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
SparkSession
에서 만들어진 커밋이 없는 경우 키를 쿼리하면 빈 값이 반환됩니다.
참고 항목
여러 스레드에서 동일한 SparkSession
을 공유하는 것은 여러 스레드에서 하나의 변수를 공유하는 것과 비슷합니다. 즉, 구성 값이 동시에 업데이트될 때 경합 조건이 발생합니다.