Delta Lake 테이블 만들기
Delta Lake는 데이터 레이크의 파일에 대한 관계형 스토리지 추상화를 제공하는 테이블을 기반으로 합니다.
데이터 프레임에서 Delta Lake 테이블 만들기
Delta Lake 테이블을 만드는 가장 쉬운 방법 중 하나는 데이터 프레임을 델타 형식으로 저장하고 테이블에 대한 데이터 파일 및 관련 메타데이터 정보를 저장할 경로를 지정하는 것입니다.
예를 들어 다음 PySpark 코드는 기존 파일의 데이터가 포함된 데이터 프레임을 로드한 다음 해당 데이터 프레임을 델타 형식으로 새 폴더 위치에 저장합니다.
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
델타 테이블을 저장한 후, 지정한 경로 위치에는 데이터에 대한 parquet 파일(데이터 프레임에 로드한 원본 파일의 형식에 관계없음)과 테이블의 트랜잭션 로그가 포함된 _delta_log 폴더가 포함됩니다.
참고
트랜잭션 로그는 테이블에 대한 모든 데이터 수정 내용을 기록합니다. 각 수정 내용을 기록하면 트랜잭션 일관성을 적용할 수 있고 테이블의 버전 관리 정보를 유지할 수 있습니다.
다음과 같이 덮어쓰기 모드를 사용하여 기존 Delta Lake 테이블을 데이터 프레임의 내용으로 바꿀 수 있습니다.
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
추가 모드를 사용하여 데이터 프레임의 행을 기존 테이블에 추가할 수도 있습니다.
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
조건부 업데이트 만들기
데이터 프레임에서 데이터를 수정한 다음 덮어쓰기를 통해 Delta Lake 테이블을 바꿀 수 있지만 데이터베이스에서 보다 일반적인 패턴은 기존 테이블의 행을 개별 트랜잭션 작업으로 삽입, 업데이트 또는 삭제하는 것입니다. 이와 같이 Delta Lake 테이블을 수정하려면 업데이트, 삭제 및 병합 작업을 지원하는 Delta Lake API에서 DeltaTable 개체를 사용할 수 있습니다. 예를 들어 다음 코드를 사용하여 category 열 값이 "Accessories"인 모든 행의 price 열을 업데이트할 수 있습니다.
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
데이터 수정 내용은 트랜잭션 로그에 기록되고 필요에 따라 테이블 폴더에 새 parquet 파일이 만들어집니다.
팁
Data Lake API 사용에 대한 자세한 내용은 Delta Lake API 설명서를 참조하세요.
테이블의 이전 버전 쿼리
Delta Lake 테이블은 트랜잭션 로그를 통한 버전 관리가 지원됩니다. 트랜잭션 로그는 각 트랜잭션에 대한 타임스탬프 및 버전 번호를 기록하여 테이블 수정 내용을 기록합니다. 이 로그된 버전 데이터를 사용하여 이전 버전의 테이블을 볼 수 있습니다(이 기능을 시간 이동이라고 함).
델타 테이블 위치의 데이터를 데이터 프레임으로 읽고 필요한 버전을 versionAsOf
옵션으로 지정하여 Delta Lake 테이블의 특정 버전에서 데이터를 검색할 수 있습니다.
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
또는 timestampAsOf
옵션을 사용하여 타임스탬프를 지정할 수 있습니다.
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)