Criar tabelas do Delta Lake

Concluído

O Delta Lake é criado em tabelas, que fornecem uma abstração do armazenamento relacional em relação aos arquivos em um data lake.

Criar uma tabela do Delta Lake de um dataframe

Uma das maneiras mais fáceis de criar uma tabela do Delta Lake é salvar um dataframe no formato delta, especificando um caminho em que os arquivos de dados e as informações de metadados relacionadas para a tabela devem ser armazenados.

Por exemplo, o seguinte código PySpark carrega um dataframe contendo dados de um arquivo existente e, em seguida, salva esse dataframe em um novo local de pasta no formato delta:

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

Depois de salvar a tabela delta, o local do caminho especificado inclui arquivos parquet para os dados (independentemente do formato do arquivo de origem carregado no dataframe) e uma pasta _delta_log que contém o log de transações da tabela.

Observação

O log de transações registra todas as modificações de dados na tabela. Ao registrar cada modificação em log, a consistência transacional pode ser imposta e as informações de controle de versão da tabela podem ser retidas.

Você pode substituir uma tabela do delta lake existente pelo conteúdo de um dataframe usando o modo de substituição, conforme mostrado aqui:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

Você também pode adicionar linhas de um dataframe a uma tabela existente usando o modo de acréscimo:

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Fazer atualizações condicionais

Embora você possa fazer modificações de dados em um dataframe e substituir uma tabela delta lake substituindo-a, um padrão mais comum em um banco de dados é inserir, atualizar ou excluir linhas em uma tabela existente como operações transacionais discretas. Para fazer essas modificações em uma tabela do Delta Lake, você pode usar o objeto DeltaTable na API do Delta Lake, que dá suporte às operações update, delete e merge. Por exemplo, você pode usar o seguinte código para atualizar a coluna price para todas as linhas com um valor de "Acessories" para a coluna category:

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

As modificações de dados são registradas no log de transações e novos arquivos parquet são criados na pasta da tabela, conforme necessário.

Dica

Para obter mais informações sobre como usar a API do Data Lake, consultea documentação da API do Delta Lake.

Consultar uma versão anterior de uma tabela

As tabelas do Delta Lake dão suporte a controle de versão por meio do log de transações. O log de transações registra modificações feitas na tabela, observando o carimbo de data/hora e o número da versão de cada transação. Você pode usar esses dados de versão registrados para exibir versões anteriores da tabela – um recurso conhecido como viagem no tempo.

Você pode recuperar dados de uma versão específica de uma tabela do Delta Lake lendo os dados do local da tabela delta em um dataframe, especificando a versão necessária como uma opção versionAsOf:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

Como alternativa, você pode especificar um carimbo de data/hora usando a opção timestampAsOf:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)