Créer des tables Delta Lake
Delta Lake repose sur les tables, qui fournissent une abstraction du stockage relationnel sur les fichiers d’un lac de données.
Création d’une table Delta Lake à partir d’un dataframe
L’une des méthodes les plus simples pour créer une table Delta Lake consiste à enregistrer un dataframe au format delta, en spécifiant un chemin d’accès où les fichiers de données et les informations de métadonnées associées pour la table doivent être stockés.
Par exemple, le code PySpark suivant charge un dataframe avec des données à partir d’un fichier existant, puis enregistre ce dataframe dans un nouvel emplacement de dossier au format delta :
# Load a file into a dataframe
df = spark.read.load('/data/mydata.csv', format='csv', header=True)
# Save the dataframe as a delta table
delta_table_path = "/delta/mydata"
df.write.format("delta").save(delta_table_path)
Après avoir enregistré la table delta, l’emplacement du chemin que vous avez spécifié inclut des fichiers Parquet pour les données (quel que soit le format du fichier source que vous avez chargé dans le dataframe) et un dossier _delta_log contenant le journal des transactions pour la table.
Notes
Le journal des transactions enregistre toutes les modifications de données apportées à la table. En journalisant chaque modification, la cohérence transactionnelle peut être appliquée et les informations de contrôle de version de la table peuvent être conservées.
Vous pouvez remplacer une table Delta Lake existante par le contenu d’un dataframe à l’aide du mode overwrite, comme illustré ici :
new_df.write.format("delta").mode("overwrite").save(delta_table_path)
Vous pouvez également ajouter des lignes d’un dataframe à une table existante à l’aide du mode append :
new_rows_df.write.format("delta").mode("append").save(delta_table_path)
Apport de mises à jour conditionnelles
Bien que vous puissiez apporter des modifications aux données dans un dataframe, puis remplacer une table Delta Lake, un modèle plus courant dans une base de données consiste à insérer, mettre à jour ou supprimer des lignes dans une table existante en tant qu’opérations transactionnelles discrètes. Pour apporter de telles modifications à une table Delta Lake, vous pouvez utiliser l’objet DeltaTable dans l’API Delta Lake, qui prend en charge les opérations de update, delete et merge. Par exemple, vous pouvez utiliser le code suivant pour mettre à jour la colonne price pour toutes les lignes avec une valeur de colonne category d’« Accessories » :
from delta.tables import *
from pyspark.sql.functions import *
# Create a deltaTable object
deltaTable = DeltaTable.forPath(spark, delta_table_path)
# Update the table (reduce price of accessories by 10%)
deltaTable.update(
condition = "Category == 'Accessories'",
set = { "Price": "Price * 0.9" })
Les modifications de données sont enregistrées dans le journal des transactions, et les nouveaux fichiers Parquet sont créés dans le dossier des tables selon les besoins.
Conseil
Pour plus d’informations sur l’utilisation de l’API Delta Lake, consultez la documentation de l’API Delta Lake.
Interrogation de la version précédente d’une table
Les tables Delta Lake prennent en charge le contrôle de version via le journal des transactions. Le journal des transactions enregistre les modifications apportées à la table, en notant le timestamp et le numéro de version pour chaque transaction. Vous pouvez utiliser ces données de version journalisées pour afficher les versions précédentes de la table, une fonctionnalité appelée voyage dans le temps.
Vous pouvez récupérer des données à partir d’une version spécifique d’une table Delta Lake en lisant les données de l’emplacement de la table delta dans un dataframe, en spécifiant la version requise comme option versionAsOf
:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
Vous pouvez également spécifier un timestamp à l’aide de l’option timestampAsOf
:
df = spark.read.format("delta").option("timestampAsOf", '2022-01-01').load(delta_table_path)