Compartilhar via


Tutorial: Delta Lake

Este tutorial apresenta operações comuns do Delta Lake no Azure Databricks, incluindo o seguinte:

Execute o código Python, Scala e SQL de exemplo deste artigo em um notebook anexado a um recuso de computação do Azure Databricks, como um cluster. Também é possível executar o código SQL deste artigo em uma consulta associada a um SQL warehouse no SQL do Databricks.

Prepare os dados de origem

Este tutorial se baseia em um conjunto de dados chamado People 10 M. Ele contém 10 milhões de registros fictícios com fatos sobre pessoas, como nomes e sobrenomes, data de nascimento e salário. Este tutorial pressupõe que esse conjunto de dados esteja em um volume do Catálogo do Unity associado ao workspace do Azure Databricks de destino.

Para obter o conjunto de dados People 10 M para este tutorial, faça o seguinte:

  1. Vá para a página People 10 M no Kaggle.
  2. Clique em Baixar para baixar um arquivo nomeado archive.zip para seu computador local.
  3. Extraia o arquivo nomeado export.csv do arquivo archive.zip. O arquivo export.csv contém os dados deste tutorial.

Para carregar o arquivo export.csv no volume, faça o seguinte:

  1. Na barra lateral, clique em Catálogo.
  2. No Gerenciador de Catálogos, encontre e abra o volume no qual você deseja carregar o arquivo export.csv.
  3. Clique em Carregar este volume.
  4. Arraste e solte ou navegue até e selecione o arquivo export.csv em seu computador local.
  5. Clique em Carregar.

Nos exemplos de código a seguir, substitua /Volumes/main/default/my-volume/export.csv pelo caminho para o arquivo export.csv no volume de destino.

Criar uma tabela

Todas as tabelas criadas no Azure Databricks usam o Delta Lake por padrão. A Databricks recomenda a utilização de tabelas geridas pelo Unity Catalog.

No exemplo de código anterior e nos exemplos de código a seguir, substitua o nome da tabela main.default.people_10m pelo catálogo de três partes de destino, o esquema e o nome da tabela no Catálogo do Unity.

Observação

O Delta Lake é o padrão para todos os comandos de leitura/gravação e criação de tabelas do Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

As operações anteriores criam uma nova tabela gerenciada. Para obter informações sobre as opções disponíveis ao criar uma tabela Delta, confira CREATE TABLE.

No Databricks Runtime 13.3 LTS e superior, você pode usar CREATE TABLE LIKE para criar uma nova tabela Delta vazia que duplica o esquema e as propriedades da tabela para uma tabela Delta de origem. Isso pode ser especialmente útil ao promover tabelas de um ambiente de desenvolvimento para produção, como é mostrado no exemplo de código a seguir:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

Para criar uma tabela vazia, você também pode usar a API DeltaTableBuilder no Delta Lake para Python e Scala. Em comparação com as APIs DataFrameWriter equivalentes, essas APIs facilitam a especificação de informações adicionais, como comentários de coluna, propriedades de tabela e colunas geradas.

Importante

Esse recurso está em uma versão prévia.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Executar upsert para uma tabela

Para mesclar um conjunto de atualizações e inserções em uma tabela Delta existente, use o método DeltaTable.merge para Python e Scalae a instrução MERGE INTO para SQL. Por exemplo, o exemplo a seguir usa os dados da tabela de origem e os mescla na tabela Delta de destino. Quando há uma linha correspondente nas duas tabelas, o Delta Lake atualiza a coluna de dados usando a expressão especificada. Quando não há nenhuma linha correspondente, o Delta Lake adiciona uma nova linha. Essa operação é conhecida como upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

No SQL, se você especificar *, isso atualiza ou insere todas as colunas na tabela de destino, supondo que a tabela de origem tenha as mesmas colunas que a tabela de destino. Se a tabela de destino não tiver as mesmas colunas, a consulta gerará um erro de análise.

Você precisa especificar um valor para cada coluna na tabela ao executar uma operação de inserção (por exemplo, quando não há nenhuma linha correspondente no conjunto de dados existente). No entanto, você não precisa atualizar todos os valores.

Para ver os resultados, consulte a tabela.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Ler uma tabela

Você acessa dados em tabelas Delta pelo nome da tabela ou pelo caminho da tabela, conforme mostrado nos seguintes exemplos:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Gravar em uma tabela

O Delta Lake usa a sintaxe padrão para gravar dados em tabelas.

Para adicionar atomicamente novos dados a uma tabela Delta existente, use o modo de acréscimo, como mostrado nos seguintes exemplos:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

Para substituir todos os dados em uma tabela, use o modo substituir, como nos seguintes exemplos:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Atualizar uma tabela

Você pode atualizar dados que corresponde a um predicado em uma tabela Delta. Por exemplo, em uma tabela people_10m de exemplo, para alterar uma abreviatura na coluna gender de M ou F para Male ou Female, você pode executar o seguinte:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Excluir de uma tabela

É possível remover dados que corresponde a um predicado de uma tabela Delta. Por exemplo, em uma tabela people_10m de exemplo, para excluir todas as linhas correspondentes a pessoas com um valor na coluna birthDate anterior a 1955, você pode executar o seguinte:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Importante

A exclusão remove os dados da versão mais recente da tabela Delta, mas não os remove do armazenamento físico até que as versões antigas sejam explicitamente a vácuo. Consulte vácuo para obter detalhes.

Exibir o histórico de tabelas

Para exibir o histórico de uma tabela, use o método DeltaTable.history para Python e Scala e a instrução DESCRIBE HISTORY no SQL, que fornece informações de procedência, incluindo a versão da tabela, a operação, o usuário e assim por diante, para cada gravação em uma tabela.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Consultar uma versão anterior da tabela (viagem no tempo)

A viagem no tempo do Delta Lake permite consultar um instantâneo mais antigo de uma tabela Delta.

Para consultar uma versão mais antiga de uma tabela, especifique a versão ou o carimbo de data/hora da tabela. Por exemplo, para consultar a versão 0 ou o carimbo de data/hora 2024-05-15T22:43:15.000+00:00Z do histórico anterior, use o seguinte:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

Para os carimbos de data/hora, somente cadeias de caracteres de data ou de carimbos de data/hora são aceitas, por exemplo, "2024-05-15T22:43:15.000+00:00" ou "2024-05-15 22:43:15".

As opções do DataFrameReader permitem que você crie um DataFrame com base em uma tabela Delta que seja corrigido para uma versão específica do carimbo de data/hora ou tabela, por exemplo:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

Para ver detalhes, confira Trabalhar com o histórico de tabelas do Delta Lake.

Otimizar uma tabela

Depois de realizar várias alterações em uma tabela, você poderá ter muitos arquivos pequenos. A fim de aprimorar a velocidade das consultas de leitura, use a operação de otimização para recolher arquivos pequenos em arquivos maiores:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Ordem Z por colunas

Para aprimorar ainda mais o desempenho de leitura, coloque as informações relacionadas no mesmo conjunto de arquivos pela ordenação z. Algoritmos do Delta Lake que ignoram dados usam essa ordenação para reduzir drasticamente a quantidade de dados que precisam ser lidos. Para dados de ordem z, especifique as colunas a serem ordenadas na operação ordem z. Por exemplo, para agrupar por gender, execute:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

Para obter o conjunto completo de opções disponíveis ao executar a operação de otimização, consulte Otimizar o layout do arquivo de dados.

Limpar instantâneos com VACUUM

O Delta Lake fornece isolamento de instantâneo para leituras, o que significa que é seguro executar uma operação de otimização mesmo enquanto outros usuários ou trabalhos estão consultando a tabela. No entanto, por fim, você deve limpar os instantâneos antigos. Você pode fazer isso executando a operação de vácuo:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

Para obter detalhes sobre como usar a operação de vácuo efetivamente, consulte Remover arquivos de dados não utilizados com vácuo.