Criar tabelas Delta Lake

Concluído

O lago delta é construído em tabelas, que fornecem uma abstração de armazenamento relacional sobre arquivos em um data lake.

Criando uma tabela Delta Lake a partir de um dataframe

Uma das maneiras mais fáceis de criar uma tabela Delta Lake é salvar um dataframe no formato delta , especificando um caminho onde os arquivos de dados e as informações de metadados relacionadas para a tabela devem ser armazenados.

Por exemplo, o seguinte código PySpark carrega um dataframe com dados de um arquivo existente e, em seguida, salva esse dataframe em um novo local de pasta no formato delta :

# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)

# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)

Depois de salvar a tabela delta, o local do caminho especificado inclui arquivos de parquet para os dados (independentemente do formato do arquivo de origem carregado no dataframe) e uma pasta _delta_log contendo o log de transações da tabela.

Nota

O log de transações registra todas as modificações de dados na tabela. Ao registrar cada modificação, a consistência transacional pode ser imposta e as informações de controle de versão da tabela podem ser mantidas.

Você pode substituir uma tabela Delta Lake existente pelo conteúdo de um quadro de dados usando o modo de substituição , conforme mostrado aqui:

new_df.write.format("delta").mode("overwrite").save(delta_table_path)

Você também pode adicionar linhas de um dataframe a uma tabela existente usando o modo de acréscimo :

new_rows_df.write.format("delta").mode("append").save(delta_table_path)

Fazer atualizações condicionais

Embora você possa fazer modificações de dados em um quadro de dados e, em seguida, substituir uma tabela Delta Lake substituindo-a, um padrão mais comum em um banco de dados é inserir, atualizar ou excluir linhas em uma tabela existente como operações transacionais discretas. Para fazer essas modificações em uma tabela Delta Lake, você pode usar o objeto DeltaTable na API Delta Lake, que oferece suporte a operações de atualização, exclusão e mesclagem . Por exemplo, você pode usar o código a seguir para atualizar a coluna de preço para todas as linhas com um valor de coluna de categoria de "Acessórios":

from delta.tables import *
from pyspark.sql.functions import *

# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

As modificações de dados são registradas no log de transações, e novos arquivos de parquet são criados na pasta da tabela, conforme necessário.

Gorjeta

Para obter mais informações sobre como usar a API Delta Lake, consulte a documentação da API Delta Lake.

Consultar uma versão anterior de uma tabela

As tabelas Delta Lake suportam o controle de versão por meio do log de transações. O log de transações registra as modificações feitas na tabela, observando o carimbo de data/hora e o número da versão de cada transação. Você pode usar esses dados de versão registrada para exibir versões anteriores da tabela - um recurso conhecido como viagem no tempo.

Você pode recuperar dados de uma versão específica de uma tabela Delta Lake lendo os dados do local da tabela delta em um dataframe, especificando a versão necessária como opção versionAsOf :

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)

Como alternativa, você pode especificar um carimbo de data/hora usando a timestampAsOf opção:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)