Partager via


Mettre à jour un schéma de table Delta Lake

Delta Lake vous permet de mettre à jour le schéma d’une table. Les types d’applications suivants sont pris en charge :

  • Ajout de nouvelles colonnes (à des positions arbitraires)
  • Réorganisation des colonnes existantes
  • Attribution d’un nouveau nom aux colonnes existantes

Vous pouvez apporter ces modifications explicitement à l’aide de DDL ou implicitement à l’aide de DML.

Important

Une mise à jour d’un schéma de table Delta est une opération qui entre en conflit avec toutes les opérations d’écriture Delta simultanées.

Lorsque vous mettez à jour un schéma de table Delta, les flux qui lisent à partir de cette table se terminent. Si vous souhaitez que le flux continue, vous devez le redémarrer. Pour plus d’informations, consultez Considérations relatives à la production pour les applications Structured Streaming.

Mettre à jour explicitement le schéma pour ajouter des colonnes

ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Par défaut, la nullité est true.

Pour ajouter une colonne à un champ imbriqué, utilisez :

ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

Par exemple, si le schéma avant l’exécution de ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) est :

- root
| - colA
| - colB
| +-field1
| +-field2

le schéma après est :

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

Notes

L’ajout de colonnes imbriquées est pris en charge uniquement pour les structs. Les tableaux et les mappages ne sont pas pris en charge.

Mettre à jour explicitement le schéma pour modifier le commentaire ou l’ordre des colonnes

ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Pour modifier une colonne dans un champ imbriqué, utilisez :

ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

Par exemple, si le schéma avant l’exécution de ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST est :

- root
| - colA
| - colB
| +-field1
| +-field2

le schéma après est :

- root
| - colA
| - colB
| +-field2
| +-field1

Mettre à jour explicitement le schéma pour remplacer les colonnes

ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

Par exemple, lors de l’exécution de la commande DDL suivante :

ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

si le schéma avant est :

- root
| - colA
| - colB
| +-field1
| +-field2

le schéma après est :

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

Mettre à jour explicitement le schéma pour renommer des colonnes

Remarque

Cette fonctionnalité est disponible sur Databricks Runtime 10.4 LTS et versions ultérieures.

Pour renommer des colonnes sans réécrire les données existantes des colonnes, vous devez activer le mappage de colonnes pour la table. Cf. Renommage et suppression des colonnes avec le mappage de colonnes Delta Lake.

Pour renommer une colonne :

ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

Pour renommer un champ imbriqué :

ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

Par exemple, lorsque vous exécutez la commande suivante :

ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

Si le schéma avant est :

- root
| - colA
| - colB
| +-field1
| +-field2

Alors le schéma après est :

- root
| - colA
| - colB
| +-field001
| +-field2

Cf. Renommage et suppression des colonnes avec le mappage de colonnes Delta Lake.

Mettre à jour explicitement le schéma pour supprimer des colonnes

Remarque

Cette fonctionnalité est disponible dans Databricks Runtime 11.3 LTS et versions ultérieures.

Pour supprimer des colonnes en tant qu’opération de métadonnées uniquement sans réécriture de fichiers de données, vous devez activer le mappage de colonnes pour la table. Cf. Renommage et suppression des colonnes avec le mappage de colonnes Delta Lake.

Important

La suppression d’une colonne des métadonnées ne supprime pas les données sous-jacentes de la colonne dans les fichiers. Pour vider les données de colonne supprimées, vous pouvez utiliser REORG TABLE pour réécrire des fichiers. Vous pouvez ensuite utiliser VACUUM pour supprimer physiquement les fichiers qui contiennent les données de colonne supprimées.

Pour supprimer une colonne :

ALTER TABLE table_name DROP COLUMN col_name

Pour supprimer plusieurs colonnes :

ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

Mettre à jour explicitement le schéma pour modifier le type ou le nom de la colonne

Vous pouvez modifier le type ou le nom d’une colonne ou supprimer une colonne en réécrivant la table. Pour ce faire, utilisez l’option overwriteSchema.

L’exemple suivant illustre la modification d’un type de colonne :

(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

L’exemple suivant illustre la modification d’un nom de colonne :

(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)

Activer l’évolution du schéma

Vous pouvez activer l’évolution du schéma en effectuant l’une des opérations suivantes :

Databricks recommande d’activer l’évolution du schéma pour chaque opération d’écriture plutôt que de définir une conf Spark.

Lorsque vous utilisez des options ou une syntaxe pour activer l’évolution du schéma dans une opération d’écriture, cela est prioritaire sur la conf Spark.

Remarque

Il n’existe aucune clause d’évolution de schéma pour les instructions INSERT INTO.

Activer l’évolution du schéma pour les écritures afin d’ajouter de nouvelles colonnes

Les colonnes qui sont présentes dans la requête d’approvisionnement mais qui sont absentes de la table cible sont automatiquement ajoutées dans le cadre d’une transaction d’écriture lorsque l’évolution du schéma est activée. Consultez Activer l’évolution du schéma.

La casse est conservée lors de l’ajout d’une nouvelle colonne. Les nouvelles colonnes sont ajoutées à la fin du schéma de la table. Si les colonnes supplémentaires se trouvent dans un struct, elles sont ajoutées à la fin du struct dans la table cible.

L’exemple suivant illustre l’utilisation de l’option mergeSchema avec le chargeur automatique. Consultez Qu’est-ce que Auto Loader ?.

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)

L’exemple suivant illustre l’utilisation de l’option mergeSchema avec une opération d’écriture par lots :

(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)

Évolution automatique du schéma pour la fusion Delta Lake

L’évolution du schéma permet aux utilisateurs de résoudre les incompatibilités de schéma entre la table cible et la table source dans la fusion. Il gère les deux cas suivants :

  1. Une colonne de la table source n’est pas présente dans la table cible. La nouvelle colonne est ajoutée au schéma cible et ses valeurs sont insérées ou mises à jour à l’aide des valeurs sources.
  2. Une colonne dans la table cible n’est pas présente dans la table source. Le schéma cible reste inchangé ; les valeurs de la colonne cible supplémentaire sont laissées inchangées (pour UPDATE) ou définies sur NULL (pour INSERT).

Vous devez activer manuellement l’évolution automatique du schéma. Consultez Activer l’évolution du schéma.

Remarque

Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez spécifier les colonnes et les champs struct présents dans la table source par leur nom dans les actions d’insertion ou de mise à jour. Dans Databricks Runtime 11.3 LTS et versions antérieures, seules les actions INSERT * ou UPDATE SET * peuvent être utilisées pour l’évolution du schéma avec la fusion.

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez utiliser l’évolution du schéma avec des structs imbriqués à l’intérieur des cartes, comme map<int, struct<a: int, b: int>>.

Syntaxe d’évolution de schéma pour la fusion

Dans Databricks Runtime 15.2 et versions ultérieures, vous pouvez spécifier une évolution de schéma dans une instruction de fusion en utilisant SQL ou les API Table Delta :

SQL

MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE

Python

from delta.tables import *

(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)

Scala

import io.delta.tables._

targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()

Exemples d’opérations de fusion avec l’évolution du schéma

Voici quelques exemples des effets de l’opération merge avec et sans évolution du schéma.

Colonnes Requête (dans SQL) Comportement sans évolution du schéma (par défaut) Comportement avec évolution du schéma
Colonnes cibles : key, value

Colonnes sources : key, value, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Le schéma de la table reste inchangé. Seules les colonnes key et value sont mises à jour/insérées. Le schéma de la table est modifié en (key, value, new_value). Les enregistrements existants avec des correspondances sont mis à jour avec le value et le new_value dans la source. Les nouvelles lignes sont insérées avec le schéma (key, value, new_value).
Colonnes cibles : key, old_value

Colonnes sources : key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
Les actions UPDATE et INSERT génèrent une erreur parce que la colonne cible old_value ne figure pas dans la source Le schéma de la table est modifié en (key, old_value, new_value). Les enregistrements existants avec des correspondances sont mis à jour avec le new_value dans la source, laissant le old_value inchangé. Les nouveaux enregistrements sont insérés avec les key, new_value et NULL spécifiés pour le old_value.
Colonnes cibles : key, old_value

Colonnes sources : key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value
UPDATE génère une erreur parce que la colonne new_value n’existe pas dans la table cible. Le schéma de la table est modifié en (key, old_value, new_value). Les enregistrements existants avec des correspondances sont mis à jour avec le new_value dans la source, laissant le old_value inchangé, et les enregistrements sans correspondance ont NULL entré pour le new_value. Voir la remarque (1).
Colonnes cibles : key, old_value

Colonnes sources : key, new_value
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)
INSERT génère une erreur parce que la colonne new_value n’existe pas dans la table cible. Le schéma de la table est modifié en (key, old_value, new_value). Les nouveaux enregistrements sont insérés avec les key, new_value et NULL spécifiés pour le old_value. Les enregistrements existants ont NULL entré pour new_value, laissant old_value inchangé. Voir la remarque (1).

(1) Ce comportement est disponible dans Databricks Runtime 12.2 LTS et versions ultérieures. Databricks Runtime 11.3 LTS et versions antérieures présentent une erreur dans cette condition.

Exclure des colonnes avec la fusion Delta Lake

Dans Databricks Runtime 12.2 LTS et versions ultérieures, vous pouvez utiliser des clauses EXCEPT dans des conditions de fusion pour exclure explicitement des colonnes. Le comportement du mot clé EXCEPT varie selon que l’évolution du schéma est activée.

Si l’évolution du schéma est désactivée, le mot clé EXCEPT s’applique à la liste des colonnes de la table cible et permet d’exclure les colonnes des actions UPDATE ouINSERT. Les colonnes exclues sont définies sur null.

Si l’évolution du schéma est activée, le mot clé EXCEPT s’applique à la liste des colonnes de la table source et permet d’exclure les colonnes de l’évolution du schéma. Une nouvelle colonne dans la source qui n’est pas présente dans la cible n’est pas ajoutée au schéma cible si elle est répertoriée dans la clause EXCEPT. Les colonnes exclues qui sont déjà présentes dans la cible sont définies sur null.

Les exemples ci-dessous illustrent cette syntaxe :

Colonnes Requête (dans SQL) Comportement sans évolution du schéma (par défaut) Comportement avec évolution du schéma
Colonnes cibles : id, title, last_updated

Colonnes sources : id, title, review, last_updated
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Les nouvelles lignes sont insérées à l’aide de valeurs pour id et title. Le champ exclu last_updated est défini sur null. Le champ review est ignoré, car il n’est pas dans la cible. Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Le schéma est évolué pour ajouter le champ review. Les nouvelles lignes sont insérées à l’aide de tous les champs sources, à l’exception de last_updated, qui est défini sur null.
Colonnes cibles : id, title, last_updated

Colonnes sources : id, title, review, internal_count
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)
INSERT génère une erreur parce que la colonne internal_count n’existe pas dans la table cible. Les lignes correspondantes sont mises à jour en définissant le champ last_updated sur la date actuelle. Le champ review est ajouté à la table cible, mais le champ internal_count est ignoré. Pour les nouvelles lignes insérées, le champ last_updated est défini sur null.

Traitement des colonnes NullType dans les mises à jour de schéma

Comme parquet ne prend pas en charge NullType, NullType les colonnes sont supprimées du tableau lors de l’écriture dans les tables delta, mais elles sont toujours stockées dans le schéma. Lorsqu’un type de données différent est reçu pour cette colonne, Delta Lake fusionne le schéma avec le nouveau type de données. Si Delta Lake reçoit une NullType pour une colonne existante, l’ancien schéma est conservé et la nouvelle colonne est supprimée pendant l’écriture.

NullType la diffusion en continu n’est pas prise en charge. Étant donné que vous devez définir des schémas lors de l’utilisation de la diffusion en continu, cela doit être très rare. NullType n’est pas non plus accepté pour les types complexes tels que ArrayType et MapType .

Remplacer un schéma de table

Par défaut, le remplacement des données dans une table ne remplace pas le schéma. Lors du remplacement d’une table à l’aide mode("overwrite") de sans replaceWhere , vous souhaiterez peut-être quand même remplacer le schéma des données en cours d’écriture. Vous remplacez le schéma et le partitionnement de la table en affectant à l’option overwriteSchema à la true :

df.write.option("overwriteSchema", "true")

Important

Vous ne pouvez pas spécifier overwriteSchema comme true lors de l’utilisation du remplacement de partition dynamique.