Utiliser des tables delta dans Spark

Effectué

Vous pouvez utiliser des tables delta (ou des fichiers de format delta) pour récupérer et modifier des données de plusieurs manières.

Utilisation de Spark SQL

La façon la plus courante d’utiliser des données dans des tables delta dans Spark consiste à utiliser Spark SQL. Vous pouvez incorporer des instructions SQL dans d’autres langages (tels que PySpark ou Scala) en utilisant la bibliothèque spark.sql. Par exemple, le code suivant insère une ligne dans la table products.

spark.sql("INSERT INTO products VALUES (1, 'Widget', 'Accessories', 2.99)")

Vous pouvez également utiliser la commande magic %%sql dans un notebook pour exécuter des instructions SQL.

%%sql

UPDATE products
SET Price = 2.49 WHERE ProductId = 1;

Utiliser l’API Delta

Quand vous souhaitez utiliser des fichiers delta plutôt que des tables de catalogue, il peut être plus simple d’utiliser l’API Delta Lake. Vous pouvez créer une instance d’une DeltaTable à partir d’un emplacement de dossier contenant des fichiers au format delta, puis utiliser l’API pour modifier les données de la table.

from delta.tables import *
from pyspark.sql.functions import *

# Create a DeltaTable object
delta_path = "Files/mytable"
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the table (reduce price of accessories by 10%)
deltaTable.update(
    condition = "Category == 'Accessories'",
    set = { "Price": "Price * 0.9" })

Recourir au voyage dans le temps pour utiliser la gestion de versions de table

Les modifications apportées aux tables delta sont enregistrées dans le journal des transactions de la table concernée. Vous pouvez utiliser les transactions journalisées pour afficher l’historique des modifications apportées à la table et récupérer des versions antérieures des données (processus appelé voyage dans le temps)

Pour voir l’historique d’une table, vous pouvez utiliser la commande SQL DESCRIBE comme indiqué ici.

%%sql

DESCRIBE HISTORY products

Le résultat de cette instruction indique les transactions qui ont été appliquées à la table, comme illustré ici (certaines colonnes ont été omises) :

version timestamp opération operationParameters
2 2023-04-04T21:46:43Z UPDATE {"predicate":"(ProductId = 1)"}
1 2023-04-04T21:42:48Z WRITE {"mode":"Append","partitionBy":"[]"}
0 2023-04-04T20:04:23Z CREATE TABLE {"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"}

Pour voir l’historique d’une table externe, vous pouvez spécifier l’emplacement du dossier au lieu du nom de la table.

%%sql

DESCRIBE HISTORY 'Files/mytable'

Vous pouvez récupérer des données à partir d’une version spécifique des données en lisant l’emplacement du fichier delta dans un dataframe, en spécifiant la version requise comme option versionAsOf :

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_path)

Vous pouvez également spécifier un timestamp à l’aide de l’option timestampAsOf :

df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_path)