자습서: Delta Lake
이 자습서에서는 다음을 포함하여 Azure Databricks에 대한 일반적인 Delta Lake 작업을 소개합니다.
- 테이블을 만듭니다.
- 테이블에 Upsert합니다.
- 테이블에서 읽습니다.
- 테이블 기록을 표시합니다.
- 이전 버전의 테이블을 쿼리합니다.
- 테이블을 최적화합니다.
- Z 순서 인덱스를 추가합니다.
- 참조되지 않은 파일을 진공합니다.
클러스터와 같은 Azure Databricks 컴퓨팅 리소스에 연결된 Notebook 내에서 이 문서의 Python, Scala 및 SQL 코드 예제를 실행할 수 있습니다. Databricks SQL에서 SQL 웨어하우스와 연결된 쿼리 내에서 이 문서의 SQL 코드를 실행할 수도 있습니다.
원본 데이터 준비
이 자습서에서는 People 10 M이라는 데이터 세트를 사용합니다. 여기에는 이름과 성, 생년월일 및 급여와 같은 사람들에 대한 사실을 담고 있는 1천만 개의 가상의 기록이 포함되어 있습니다. 이 자습서에서는 이 데이터 세트가 대상 Azure Databricks 작업 영역과 연결된 Unity 카탈로그 볼륨 에 있다고 가정합니다.
이 자습서의 People 10 M 데이터 세트를 얻으려면 다음을 수행합니다.
- Kaggle의 피플 10 M 페이지로 이동합니다.
- 다운로드를 클릭하여 로컬 컴퓨터에 명명된
archive.zip
파일을 다운로드합니다. - 파일에서 명명된
export.csv
파일을 추출합니다archive.zip
. 이 파일에는export.csv
이 자습서의 데이터가 포함되어 있습니다.
볼륨에 export.csv
파일을 업로드하려면 다음을 수행합니다.
- 사이드바에서 카탈로그를 클릭합니다.
- 카탈로그 탐색기에서 파일을 업로드
export.csv
할 볼륨을 찾아 엽니다. - 이 볼륨에 업로드를 클릭합니다.
- 로컬 컴퓨터에서 파일을 끌어서 놓거나 찾아보고 선택합니다
export.csv
. - 업로드를 클릭합니다.
다음 코드 예제에서는 대상 볼륨의 파일 경로 export.csv
로 바꿉 /Volumes/main/default/my-volume/export.csv
니다.
테이블 만들기
Azure Databricks에서 만든 모든 테이블은 기본적으로 Delta Lake를 사용합니다. Databricks는 Unity 카탈로그 관리 테이블을 사용하는 것이 좋습니다.
이전 코드 예제 및 다음 코드 예제에서 테이블 이름을 main.default.people_10m
Unity 카탈로그의 대상 3부 카탈로그, 스키마 및 테이블 이름으로 바꿉니다.
참고 항목
Delta Lake는 모든 읽기, 쓰기 및 테이블 만들기 명령 Azure Databricks의 기본값입니다.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
이전 작업은 새 관리형 테이블을 만듭니다. 델타 테이블을 만들 때 사용할 수 있는 옵션에 대한 자세한 내용은 CREATE TABLE을 참조하세요.
Databricks Runtime 13.3 LTS 이상에서는 CREATE TABLE LIKE를 사용하여 원본 델타 테이블에 대한 스키마 및 테이블 속성을 복제하는 빈 델타 테이블을 새로 만들 수 있습니다. 이는 다음 코드 예제와 같이 개발 환경에서 프로덕션으로 테이블을 승격할 때 특히 유용할 수 있습니다.
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
빈 테이블을 만들려면 Python 및 Scala용 Delta Lake에서 API를 사용할 DeltaTableBuilder
수도 있습니다. 이와 동등한 DataFrameWriter API에 비해 이러한 API를 사용하면 열 주석, 테이블 속성 및 생성된 열과 같은 추가 정보를 더 쉽게 지정할 수 있습니다.
Important
이 기능은 공개 미리 보기 상태입니다.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
테이블에 Upsert
업데이트 및 삽입 집합을 기존 델타 테이블에 병합하려면 Python 및 Scala에 대한 메서드와 SQL용 MERGE INTO 문을 사용합니다DeltaTable.merge
. 예를 들어 다음 예제에서는 원본 테이블에서 데이터를 가져와 대상 델타 테이블에 병합합니다. 두 테이블에 일치하는 행이 있는 경우 Delta Lake는 지정된 식을 사용하여 데이터 열을 업데이트합니다. 일치하는 행이 없으면 Delta Lake에서 새 행을 추가합니다. 이 작업을 upsert라고 합니다.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
SQL에서 지정 *
하는 경우 원본 테이블에 대상 테이블과 동일한 열이 있다고 가정하여 대상 테이블의 모든 열을 업데이트하거나 삽입합니다. 대상 테이블에 동일한 열이 없으면 쿼리가 분석 오류를 throw합니다.
삽입 작업을 수행할 때 테이블의 모든 열에 대한 값을 지정해야 합니다(예: 기존 데이터 세트에 일치하는 행이 없는 경우). 그러나 모든 값을 업데이트할 필요는 없습니다.
결과를 보려면 테이블을 쿼리합니다.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
테이블 읽기
다음 예제와 같이 테이블 이름 또는 테이블 경로로 델타 테이블의 데이터에 액세스합니다.
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
테이블에 쓰기
Delta Lake는 테이블에 데이터를 쓰는 데 표준 구문을 사용합니다.
기존 델타 테이블에 새 데이터를 원자성으로 추가하려면 다음 예제와 같이 추가 모드를 사용합니다.
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
테이블의 모든 데이터를 바꾸려면 다음 예제와 같이 덮어쓰기 모드를 사용합니다.
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
테이블 업데이트
델타 테이블에서 조건자와 일치하는 데이터를 업데이트할 수 있습니다. 예를 들어 예제 people_10m
표에서 열 M
Female
Male
F
의 약어를 gender
변경하려면 다음을 실행할 수 있습니다.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
테이블에서 삭제
델타 테이블에서 조건자와 일치하는 데이터를 제거할 수 있습니다. 예를 들어 예제 people_10m
테이블에서 이전1955
부터 열에 값이 있는 사용자에게 해당하는 모든 행을 birthDate
삭제하려면 다음을 실행할 수 있습니다.
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Important
삭제하면 최신 버전의 Delta 테이블에서 데이터가 제거되지만 이전 버전이 명시적으로 진공 상태일 때까지 실제 스토리지에서 데이터를 제거하지 않습니다. 자세한 내용은 vacuum을 참조하세요.
테이블 기록 표시
테이블의 기록을 보려면 테이블에 쓸 때마다 테이블 버전, 작업, 사용자 등을 비롯한 출처 정보를 제공하는 SQL의 PYTHON 및 Scala 메서드와 DESCRIBE HISTORY 문을 사용합니다DeltaTable.history
.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
이전 버전의 테이블 쿼리(시간 이동)
Delta Lake 시간 이동을 사용하면 델타 테이블의 이전 스냅샷을 쿼리할 수 있습니다.
이전 버전의 테이블을 쿼리하려면 테이블의 버전 또는 타임스탬프를 지정합니다. 예를 들어 이전 기록의 버전 0 또는 타임스탬프를 2024-05-15T22:43:15.000+00:00Z
쿼리하려면 다음을 사용합니다.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
타임스탬프의 경우 날짜 또는 타임스탬프 문자열만 허용됩니다(예 "2024-05-15T22:43:15.000+00:00"
"2024-05-15 22:43:15"
: ).
DataFrameReader 옵션을 사용하면 테이블의 특정 버전 또는 타임스탬프로 고정된 델타 테이블에서 DataFrame을 만들 수 있습니다. 예를 들면 다음과 같습니다.
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
자세한 내용은 Delta Lake 테이블 기록 작업을 참조하세요.
테이블 최적화
테이블을 여러 차례 변경한 후에는 작은 파일이 많이 있을 수 있습니다. 읽기 쿼리의 속도를 향상시키기 위해 최적화 작업을 사용하여 작은 파일을 더 큰 파일로 축소할 수 있습니다.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
열별 Z 순서
읽기 성능을 더 향상시키려면 z 순서를 지정하여 동일한 파일 집합에 관련 정보를 정렬할 수 있습니다. Delta Lake 데이터 건너뛰기 알고리즘은 이 배치를 사용하여 읽어야 하는 데이터의 양을 크게 줄입니다. z-order 데이터를 사용하려면 z 순서로 정렬할 열을 작업별로 지정합니다. 예를 들어 다음을 실행하여 정렬 gender
합니다.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
최적화 작업을 실행할 때 사용할 수 있는 전체 옵션 집합은 데이터 파일 레이아웃 최적화를 참조 하세요.
VACUUM
으로 스냅샷 정리
Delta Lake는 읽기에 대한 스냅샷 격리를 제공합니다. 즉, 다른 사용자 또는 작업이 테이블을 쿼리하는 동안에도 최적화 작업을 실행해도 안전합니다. 그러나 결국에는 이전 스냅샷을 정리해야 합니다. 진공 작업을 실행하여 이 작업을 수행할 수 있습니다.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
진공 작업을 효과적으로 사용하는 방법에 대한 자세한 내용은 진공 상태에서 사용되지 않는 데이터 파일 제거를 참조하세요.