Руководство по Delta Lake
В этом руководстве представлены общие операции Delta Lake в Azure Databricks, в том числе следующие:
- Создать таблицу.
- Upsert к таблице.
- Чтение из таблицы.
- Показать журнал таблицы.
- Запрос более ранней версии таблицы.
- Оптимизация таблицы.
- Добавление индекса Z-порядка.
- Очистка файлов без ссылок.
Пример кода Python, Scala и SQL в этой статье можно запустить из записной книжки , подключенной к вычислительному ресурсу Azure Databricks, например кластеру. Вы также можете выполнить приведенный в этой статье код SQL из запроса, связанного с хранилищем SQL в Databricks SQL.
Подготовка исходных данных
В этом руководстве используется набор данных с именем People 10 M. Он содержит 10 миллионов вымышленных записей, которые содержат факты о людях, таких как первые и фамилии, дата рождения и зарплата. В этом руководстве предполагается, что этот набор данных находится в томе каталога Unity, связанном с целевой рабочей областью Azure Databricks.
Чтобы получить набор данных People 10 M для этого руководства, сделайте следующее:
- Перейдите на страницу "Люди 10 M" в Kaggle.
- Нажмите кнопку "Скачать ", чтобы скачать файл с именем
archive.zip
на локальный компьютер. - Извлеките файл с именем
export.csv
изarchive.zip
файла. Файлexport.csv
содержит данные для этого руководства.
Чтобы отправить export.csv
файл в том, сделайте следующее:
- На боковой панели щелкните "Каталог".
- В обозревателе каталогов найдите и откройте том, в котором нужно отправить
export.csv
файл. - Нажмите кнопку " Отправить в этот том".
- Перетащите и удалите файл или
export.csv
выберите файл на локальном компьютере. - Нажмите кнопку Отправить.
В следующих примерах кода замените /Volumes/main/default/my-volume/export.csv
путь к файлу в целевом export.csv
томе.
Создание таблицы
Таблицы, созданные в Azure Databricks, по умолчанию используют Delta Lake. Databricks рекомендует использовать управляемые таблицы каталога Unity.
В предыдущем примере кода и в следующих примерах кода замените имя таблицы целевым трехкомпонентным каталогом, схемой и именем main.default.people_10m
таблицы в каталоге Unity.
Примечание.
Delta Lake — это значение по умолчанию для всех команд создания таблиц и операций чтения, записи и таблицы Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
Предыдущие операции создают новую управляемую таблицу. Сведения о доступных параметрах при создании разностной таблицы см. в статье CREATE TABLE.
В Databricks Runtime 13.3 LTS и более поздних версиях можно использовать create TABLE LIKE , чтобы создать пустую таблицу Delta, которая дублирует свойства схемы и таблицы для исходной разностной таблицы. Это может быть особенно полезно при продвижении таблиц из среды разработки в рабочую среду, как показано в следующем примере кода:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Чтобы создать пустую таблицу, можно также использовать DeltaTableBuilder
API в Delta Lake для Python и Scala. По сравнению с эквивалентными API DataFrameWriter эти API упрощают указание дополнительных сведений, таких как комментарии к столбцам, свойства таблицы и созданные столбцы.
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Upsert в таблицу
Чтобы объединить набор обновлений и вставок в существующую таблицу Delta, используйте DeltaTable.merge
метод для Python и Scala, а также инструкцию MERGE INTO для SQL. Например, следующий пример принимает данные из исходной таблицы и объединяет его в целевую таблицу Delta. Если в обеих таблицах есть совпадающая строка, Delta Lake обновляет столбец данных с использованием заданного выражения. Если совпадающая строка отсутствует, Delta Lake добавляет новую строку. Эта операция называется upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
При указании *
в SQL это обновление или вставка всех столбцов в целевую таблицу предполагается, что исходная таблица имеет те же столбцы, что и целевая таблица. Если целевая таблица не имеет одинаковых столбцов, запрос выдает ошибку анализа.
При выполнении операции вставки необходимо указать значение для каждого столбца в таблице (например, если в существующем наборе данных нет соответствующей строки). Однако обновлять все значения не требуется.
Чтобы просмотреть результаты, отправьте к таблице запрос.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Чтение из таблицы
Доступ к данным в разностных таблицах можно получить по имени таблицы или пути к таблице, как показано в следующих примерах:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Запись в таблицу
Delta Lake использует стандартный синтаксис для записи данных в таблицы.
Чтобы атомарно добавить новые данные в существующую таблицу Delta, используйте режим добавления, как показано в следующих примерах:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Чтобы заменить все данные в таблице, используйте режим перезаписи, как показано в следующих примерах:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Обновление таблицы
Вы можете обновить данные, соответствующие предикату в таблице Delta. Например, в таблице примера people_10m
можно изменить сокращение столбца из M
столбца gender
или F
Male
на или Female
выполнить следующее:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Удаление из таблицы
Вы можете удалить данные, соответствующие предикату из таблицы Delta. Например, в таблице примера people_10m
для удаления всех строк, соответствующих людям со значением в birthDate
столбце, 1955
можно выполнить следующее:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Внимание
Удаление удаляет данные из последней версии таблицы Delta, но не удаляет их из физического хранилища до тех пор, пока старые версии не будут явно вакуумированы. См. подробные сведения о команде vacuum.
Отображение журнала таблиц
Чтобы просмотреть журнал таблицы, используйте DeltaTable.history
метод для Python и Scala, а также инструкцию DESCRIBE HISTORY в SQL, которая предоставляет сведения о происхождении, включая версию таблицы, операцию, пользователя и т. д. для каждой записи в таблицу.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Запрос более ранней версии таблицы (путешествия по времени)
Переход по временем в Delta Lake позволяет запрашивать более ранний моментальный снимок таблицы Delta.
Чтобы запросить старую версию таблицы, укажите версию или метку времени таблицы. Например, чтобы запросить версию 0 или метку 2024-05-15T22:43:15.000+00:00Z
времени из предыдущей истории, используйте следующую команду:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Для меток времени принимаются только строки даты или метки времени, например"2024-05-15T22:43:15.000+00:00"
."2024-05-15 22:43:15"
Параметры DataFrameReader позволяют создать кадр данных из таблицы Delta, фиксированной к определенной версии или метке времени таблицы, например:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Дополнительные сведения см. в разделе " Работа с журналом таблиц Delta Lake".
Оптимизация таблицы
После выполнения нескольких изменений в таблице может потребоваться много небольших файлов. Чтобы повысить скорость чтения запросов, можно использовать операцию оптимизации для сворачивания небольших файлов в большие:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Z-порядок по столбцам
Чтобы повысить производительность чтения, можно совместно использовать связанные сведения в одном наборе файлов с помощью z-упорядочения. Алгоритмы пропуска данных Delta Lake используют это кололокацию для резкого уменьшения объема данных, которые необходимо считывать. Для данных z-order можно указать столбцы для заказа в z-порядке по операции. Например, чтобы выполнить сортировку по gender
команде, выполните следующую команду:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Полный набор параметров, доступных при выполнении операции оптимизации, см. в разделе "Оптимизация макета файла данных".
Очистка моментальных снимков с помощью VACUUM
Delta Lake обеспечивает изоляцию моментальных снимков для операций чтения, что означает, что она безопасна для выполнения операции оптимизации, даже если другие пользователи или задания запрашивают таблицу. Однако в конечном итоге старые моментальные снимки нужно удалять. Это можно сделать, выполнив операцию вакуума:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Дополнительные сведения об эффективном использовании операции вакуума см. в разделе "Удаление неиспользуемых файлов данных с помощью вакуума".