Usare tabelle delta in Spark

Completato

È possibile usare tabelle delta (o file di formato delta) per recuperare e modificare i dati in diversi modi.

Uso di Spark SQL

Il modo più comune per usare i dati nelle tabelle delta in Spark consiste nell'usare Spark SQL. È possibile incorporare istruzioni SQL in altri linguaggi, ad esempio PySpark o Scala, usando la libreria spark.sql . Ad esempio, il codice seguente inserisce una riga nella tabella products .

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

In alternativa, è possibile usare il %%sql magic in un notebook per eseguire istruzioni SQL.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Usare l'API Delta

Quando si vogliono usare file differenziali anziché tabelle di catalogo, potrebbe essere più semplice usare l'API Delta Lake. È possibile creare un'istanza di deltaTable da un percorso di cartella contenente file in formato differenziale e quindi usare l'API per modificare i dati nella tabella.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Usare il tempo di spostamento per lavorare con il controllo delle versioni delle tabelle

Le modifiche apportate alle tabelle delta vengono registrate nel log delle transazioni per la tabella. È possibile usare le transazioni registrate per visualizzare la cronologia delle modifiche apportate alla tabella e per recuperare le versioni precedenti dei dati (noto come tempo di spostamento)

Per visualizzare la cronologia di una tabella, è possibile usare il DESCRIBE comando SQL come illustrato di seguito.

%%sql

DESCRIBE HISTORY products

I risultati di questa istruzione mostrano le transazioni applicate alla tabella, come illustrato di seguito (alcune colonne sono state omesse):

version timestamp operation (operazione) operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

Per visualizzare la cronologia di una tabella esterna, è possibile specificare il percorso della cartella anziché il nome della tabella.

%%sql

DESCRIBE HISTORY 'Files/mytable'

È possibile recuperare dati da una versione specifica dei dati leggendo il percorso del file differenziale in un dataframe, specificando la versione richiesta come versionAsOf opzione:

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

In alternativa, è possibile specificare un timestamp usando l'opzione timestampAsOf:

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)