Tutorial: Delta Lake
Este tutorial apresenta operações comuns do Delta Lake no Azure Databricks, incluindo o seguinte:
- Criar uma tabela.
- Executar upsert para uma tabela.
- Ler de uma tabela.
- Exibir o histórico de tabelas.
- Consultar uma versão anterior de uma tabela.
- Otimizar uma tabela.
- Adicionar um índice de ordem Z.
- Aspirar arquivos não referenciados.
Execute o código Python, Scala e SQL de exemplo deste artigo em um notebook anexado a um recuso de computação do Azure Databricks, como um cluster. Também é possível executar o código SQL deste artigo em uma consulta associada a um SQL warehouse no SQL do Databricks.
Prepare os dados de origem
Este tutorial se baseia em um conjunto de dados chamado People 10 M. Ele contém 10 milhões de registros fictícios com fatos sobre pessoas, como nomes e sobrenomes, data de nascimento e salário. Este tutorial pressupõe que esse conjunto de dados esteja em um volume do Catálogo do Unity associado ao workspace do Azure Databricks de destino.
Para obter o conjunto de dados People 10 M para este tutorial, faça o seguinte:
- Vá para a página People 10 M no Kaggle.
- Clique em Baixar para baixar um arquivo nomeado
archive.zip
para seu computador local. - Extraia o arquivo nomeado
export.csv
do arquivoarchive.zip
. O arquivoexport.csv
contém os dados deste tutorial.
Para carregar o arquivo export.csv
no volume, faça o seguinte:
- Na barra lateral, clique em Catálogo.
- No Gerenciador de Catálogos, encontre e abra o volume no qual você deseja carregar o arquivo
export.csv
. - Clique em Carregar este volume.
- Arraste e solte ou navegue até e selecione o arquivo
export.csv
em seu computador local. - Clique em Carregar.
Nos exemplos de código a seguir, substitua /Volumes/main/default/my-volume/export.csv
pelo caminho para o arquivo export.csv
no volume de destino.
Criar uma tabela
Todas as tabelas criadas no Azure Databricks usam o Delta Lake por padrão. A Databricks recomenda a utilização de tabelas geridas pelo Unity Catalog.
No exemplo de código anterior e nos exemplos de código a seguir, substitua o nome da tabela main.default.people_10m
pelo catálogo de três partes de destino, o esquema e o nome da tabela no Catálogo do Unity.
Observação
O Delta Lake é o padrão para todos os comandos de leitura/gravação e criação de tabelas do Azure Databricks.
Python
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", TimestampType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")
# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")
// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()
// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")
SQL
CREATE OR REPLACE TABLE main.default.people_10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
);
COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );
As operações anteriores criam uma nova tabela gerenciada. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, confira CREATE TABLE.
No Databricks Runtime 13.3 LTS e superior, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica o esquema e as propriedades da tabela para uma tabela Delta de origem. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para produção, como é mostrado no exemplo de código a seguir:
CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m
Para criar uma tabela vazia, você também pode usar a API DeltaTableBuilder
no Delta Lake para Python e Scala. Em comparação com as APIs DataFrameWriter equivalentes, essas APIs facilitam a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.
Importante
Esse recurso está em uma versão prévia.
Python
DeltaTable.createIfNotExists(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Scala
DeltaTable.createOrReplace(spark)
.tableName("main.default.people_10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
Executar upsert para uma tabela
Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use o método DeltaTable.merge
para Python e Scalae a instrução MERGE INTO para SQL. Por exemplo, o exemplo a seguir usa os dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente nas duas tabelas, o Delta Lake atualiza a coluna de dados usando a expressão especificada. Quando não há nenhuma linha correspondente, o Delta Lake adiciona uma nova linha. Essa operação é conhecida como upsert.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date
schema = StructType([
StructField("id", IntegerType(), True),
StructField("firstName", StringType(), True),
StructField("middleName", StringType(), True),
StructField("lastName", StringType(), True),
StructField("gender", StringType(), True),
StructField("birthDate", DateType(), True),
StructField("ssn", StringType(), True),
StructField("salary", IntegerType(), True)
])
data = [
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]
people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")
# ...
from delta.tables import DeltaTable
deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')
(deltaTable.alias("people_10m")
.merge(
people_10m_updates.alias("people_10m_updates"),
"people_10m.id = people_10m_updates.id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp
val schema = StructType(Array(
StructField("id", IntegerType, nullable = true),
StructField("firstName", StringType, nullable = true),
StructField("middleName", StringType, nullable = true),
StructField("lastName", StringType, nullable = true),
StructField("gender", StringType, nullable = true),
StructField("birthDate", TimestampType, nullable = true),
StructField("ssn", StringType, nullable = true),
StructField("salary", IntegerType, nullable = true)
))
val data = Seq(
Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)
val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")
// ...
import io.delta.tables.DeltaTable
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.as("people_10m")
.merge(
people_10m_updates.as("people_10m_updates"),
"people_10m.id = people_10m_updates.id"
)
.whenMatched()
.updateAll()
.whenNotMatched()
.insertAll()
.execute()
SQL
CREATE OR REPLACE TEMP VIEW people_10m_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
No SQL, se você especificar *
, isso atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta gerará um erro de análise.
Você precisa especificar um valor para cada coluna na tabela ao executar uma operação de inserção (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, você não precisa atualizar todos os valores.
Para ver os resultados, consulte a tabela.
Python
df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)
Scala
val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)
SQL
SELECT * FROM main.default.people_10m WHERE id >= 9999998
Ler uma tabela
Você acessa dados em tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos seguintes exemplos:
Python
people_df = spark.read.table("main.default.people_10m")
display(people_df)
Scala
val people_df = spark.read.table("main.default.people_10m")
display(people_df)
SQL
SELECT * FROM main.default.people_10m;
Gravar em uma tabela
O Delta Lake usa a sintaxe padrão para gravar dados em tabelas.
Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo de acréscimo, como mostrado nos seguintes exemplos:
Python
df.write.mode("append").saveAsTable("main.default.people_10m")
Scala
df.write.mode("append").saveAsTable("main.default.people_10m")
SQL
INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people
Para substituir todos os dados em uma tabela, use o modo substituir, como nos seguintes exemplos:
Python
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
Scala
df.write.mode("overwrite").saveAsTable("main.default.people_10m")
SQL
INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people
Atualizar uma tabela
Você pode atualizar dados que corresponde a um predicado em uma tabela Delta. Por exemplo, em uma tabela people_10m
de exemplo, para alterar uma abreviatura na coluna gender
de M
ou F
para Male
ou Female
, você pode executar o seguinte:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
)
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
SQL
UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';
Excluir de uma tabela
É possível remover dados que corresponde a um predicado de uma tabela Delta. Por exemplo, em uma tabela people_10m
de exemplo, para excluir todas as linhas correspondentes a pessoas com um valor na coluna birthDate
anterior a 1955
, você pode executar o seguinte:
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
SQL
DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'
Importante
A exclusão remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente a vácuo. Consulte vácuo para obter detalhes.
Exibir o histórico de tabelas
Para exibir o histórico de uma tabela, use o método DeltaTable.history
para Python e Scala e a instrução DESCRIBE HISTORY no SQL, que fornece informações de procedência, incluindo a versão da tabela, a operação, o usuário e assim por diante, para cada gravação em uma tabela.
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())
SQL
DESCRIBE HISTORY main.default.people_10m
Consultar uma versão anterior da tabela (viagem no tempo)
A viagem no tempo do Delta Lake permite consultar um instantâneo mais antigo de uma tabela Delta.
Para consultar uma versão mais antiga de uma tabela, especifique a versão ou o carimbo de data/hora da tabela. Por exemplo, para consultar a versão 0 ou o carimbo de data/hora 2024-05-15T22:43:15.000+00:00Z
do histórico anterior, use o seguinte:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()
display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
Para os carimbos de data/hora, somente cadeias de caracteres de data ou de carimbos de data/hora são aceitas, por exemplo, "2024-05-15T22:43:15.000+00:00"
ou "2024-05-15 22:43:15"
.
As opções do DataFrameReader permitem que você crie um DataFrame com base em uma tabela Delta que seja corrigido para uma versão específica do carimbo de data/hora ou tabela, por exemplo:
Python
df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")
display(df)
Scala
val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")
display(df)
SQL
SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'
Para ver detalhes, confira Trabalhar com o histórico de tabelas do Delta Lake.
Otimizar uma tabela
Depois de realizar várias alterações em uma tabela, você poderá ter muitos arquivos pequenos. A fim de aprimorar a velocidade das consultas de leitura, use a operação de otimização para recolher arquivos pequenos em arquivos maiores:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()
SQL
OPTIMIZE main.default.people_10m
Ordem Z por colunas
Para aprimorar ainda mais o desempenho de leitura, coloque as informações relacionadas no mesmo conjunto de arquivos pela ordenação z. Algoritmos do Delta Lake que ignoram dados usam essa ordenação para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para dados de ordem z, especifique as colunas a serem ordenadas na operação ordem z. Por exemplo, para agrupar por gender
, execute:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")
SQL
OPTIMIZE main.default.people_10m
ZORDER BY (gender)
Para obter o conjunto completo de opções disponíveis ao executar a operação de otimização, consulte Otimizar o layout do arquivo de dados.
Limpar instantâneos com VACUUM
O Delta Lake fornece isolamento de instantâneo para leituras, o que significa que é seguro executar uma operação de otimização mesmo enquanto outros usuários ou trabalhos estão consultando a tabela. No entanto, por fim, você deve limpar os instantâneos antigos. Você pode fazer isso executando a operação de vácuo:
Python
from delta.tables import *
deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()
SQL
VACUUM main.default.people_10m
Para obter detalhes sobre como usar a operação de vácuo efetivamente, consulte Remover arquivos de dados não utilizados com vácuo.