Trabalhar com tabelas Delta no Spark

Concluído

É possível trabalhar com tabelas Delta (ou arquivos no formato delta) para recuperar e modificar dados de diversas maneiras.

Usar o Spark SQL

A maneira mais comum de trabalhar com dados em tabelas Delta no Spark é usar o Spark SQL. É possível inserir instruções SQL em outras linguagens (como PySpark ou Scala) usando a biblioteca spark.sql. Por exemplo, o código a seguir insere uma linha na tabela de produtos.

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Como alternativa, é possível usar a mágica %%sql em um notebook para executar instruções SQL.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Usar a API Delta

Ao trabalhar com arquivos delta em vez de tabelas de catálogo, pode ser mais simples usar a API do Delta Lake. É possível criar uma instância de uma DeltaTable com base em um local de pasta que contém arquivos no formato delta e usar a API para modificar os dados na tabela.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Usar a viagem no tempo para trabalhar com o controle de versão de tabela

As modificações feitas nas tabelas Delta são registradas no log de transações da tabela. É possível usar as transações registradas para ver o histórico das alterações feitas na tabela e recuperar versões mais antigas dos dados (um processo conhecido como viagem no tempo)

Para ver o histórico de uma tabela, é possível usar o comando SQL DESCRIBE, conforme mostrado aqui.

%%sql

DESCRIBE HISTORY products

Os resultados dessa declaração mostram as transações que foram aplicadas à tabela, conforme mostrado aqui (algumas colunas foram omitidas):

version timestamp operation operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

Para ver o histórico de uma tabela externa, é possível especificar o local da pasta em vez do nome dela.

%%sql

DESCRIBE HISTORY 'Files/mytable'

É possível recuperar dados de uma versão específica dos dados lendo a localização do arquivo delta em um dataframe, especificando a versão necessária como uma opção versionAsOf:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Como alternativa, você pode especificar um carimbo de data/hora usando a opção timestampAsOf:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)