Trabalhar com tabelas delta no Spark
Você pode trabalhar com tabelas delta (ou arquivos de formato delta) para recuperar e modificar dados de várias maneiras.
Usando o Spark SQL
A maneira mais comum de trabalhar com dados em tabelas delta no Spark é usar o Spark SQL. Você pode incorporar instruções SQL em outros idiomas (como PySpark ou Scala) usando a biblioteca spark.sql . Por exemplo, o código a seguir insere uma linha na tabela de produtos .
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
Como alternativa, você pode usar a %%sql
mágica em um bloco de anotações para executar instruções SQL.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Usar a API Delta
Quando você deseja trabalhar com arquivos delta em vez de tabelas de catálogo, pode ser mais simples usar a API Delta Lake. Você pode criar uma instância de um DeltaTable a partir de um local de pasta contendo arquivos no formato delta e, em seguida, usar a API para modificar os dados na tabela.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Use a viagem no tempo para trabalhar com o controle de versão de tabela
As modificações feitas nas tabelas delta são registradas no log de transações da tabela. Você pode usar as transações registradas para exibir o histórico de alterações feitas na tabela e recuperar versões mais antigas dos dados (conhecidas como viagem no tempo)
Para ver o histórico de uma tabela, você pode usar o DESCRIBE
comando SQL como mostrado aqui.
%%sql
DESCRIBE HISTORY products
Os resultados desta declaração mostram as transações que foram aplicadas à tabela, como mostrado aqui (algumas colunas foram omitidas):
versão | carimbo de data/hora | operation | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | ATUALIZAR | {"predicado":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | ESCREVER | {"modo":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
Para ver o histórico de uma tabela externa, você pode especificar o local da pasta em vez do nome da tabela.
%%sql
DESCRIBE HISTORY 'Files/mytable'
Você pode recuperar dados de uma versão específica dos dados lendo o local do arquivo delta em um dataframe, especificando a versão necessária como opção versionAsOf
:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
Como alternativa, você pode especificar um carimbo de data/hora usando a timestampAsOf
opção:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)