Usare tabelle delta in Spark
È possibile usare tabelle delta (o file di formato delta) per recuperare e modificare i dati in diversi modi.
Uso di Spark SQL
Il modo più comune per usare i dati nelle tabelle delta in Spark consiste nell'usare Spark SQL. È possibile incorporare istruzioni SQL in altri linguaggi, ad esempio PySpark o Scala, usando la libreria spark.sql . Ad esempio, il codice seguente inserisce una riga nella tabella products .
spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")
In alternativa, è possibile usare il %%sql
magic in un notebook per eseguire istruzioni SQL.
%%sql
UPDATE products
SET Price = 2.49 WHERE ProductId = 1;
Usare l'API Delta
Quando si vogliono usare file differenziali anziché tabelle di catalogo, potrebbe essere più semplice usare l'API Delta Lake. È possibile creare un'istanza di deltaTable da un percorso di cartella contenente file in formato differenziale e quindi usare l'API per modificare i dati nella tabella.
from delta.tables import *
from pyspark.sql.functions import *
# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Usare il tempo di spostamento per lavorare con il controllo delle versioni delle tabelle
Le modifiche apportate alle tabelle delta vengono registrate nel log delle transazioni per la tabella. È possibile usare le transazioni registrate per visualizzare la cronologia delle modifiche apportate alla tabella e per recuperare le versioni precedenti dei dati (noto come tempo di spostamento)
Per visualizzare la cronologia di una tabella, è possibile usare il DESCRIBE
comando SQL come illustrato di seguito.
%%sql
DESCRIBE HISTORY products
I risultati di questa istruzione mostrano le transazioni applicate alla tabella, come illustrato di seguito (alcune colonne sono state omesse):
version | timestamp | operation (operazione) | operationParameters |
---|---|---|---|
2 | 2023-04-04T21:46:43Z | UPDATE | {"predicate":"(ProductId = 1)"} |
1 | 2023-04-04T21:42:48Z | WRITE | {"mode":"Append","partitionBy":"[]"} |
0 | 2023-04-04T20:04:23Z | CREATE TABLE | {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"} |
Per visualizzare la cronologia di una tabella esterna, è possibile specificare il percorso della cartella anziché il nome della tabella.
%%sql
DESCRIBE HISTORY 'Files/mytable'
È possibile recuperare dati da una versione specifica dei dati leggendo il percorso del file differenziale in un dataframe, specificando la versione richiesta come versionAsOf
opzione:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)
In alternativa, è possibile specificare un timestamp usando l'opzione timestampAsOf
:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)