Partager via


API APPLY CHANGES : Simplifier la capture des changements de données dans Delta Live Tables

Delta Live Tables simplifie la capture de changements de données (CDC) avec le APPLY CHANGES et les APPLY CHANGES FROM SNAPSHOT APIs. L’interface que vous utilisez dépend de la source des données modifiées :

  • Utilisez APPLY CHANGES pour traiter les modifications d’un flux de données de modification (CDF).
  • Utilisez APPLY CHANGES FROM SNAPSHOT (préversion publique) pour traiter les modifications apportées aux instantanés de base de données.

Auparavant, l’instruction MERGE INTO était couramment utilisée pour traiter les enregistrements CDC sur Azure Databricks. Toutefois, MERGE INTO peut produire des résultats incorrects en raison d’enregistrements hors séquence ou requiert une logique complexe pour réorganiser les enregistrements.

L’API APPLY CHANGES est prise en charge dans les interfaces SQL et Python de Delta Live Tables. L’API APPLY CHANGES FROM SNAPSHOT est prise en charge dans l’interface Python de Delta Live Tables.

Les deux APPLY CHANGES et APPLY CHANGES FROM SNAPSHOT prennent en charge la mise à jour des tables à l’aide du type SCD 1 et du type 2 :

  • Utilisez la méthode SCD de type 1 pour mettre à jour les enregistrements directement. L’historique n’est pas conservé pour les enregistrements mis à jour.
  • Utilisez SCD type 2 pour conserver un historique des enregistrements, soit sur toutes les mises à jour, soit sur les mises à jour d’un ensemble de colonnes spécifié.

Pour connaître la syntaxe et d’autres références, consultez :

Remarque

Cet article explique comment mettre à jour les tables dans votre pipeline Delta Live Tables en fonction des modifications apportées aux données sources. Pour savoir comment enregistrer et interroger des informations sur les changements au niveau des lignes pour les tables Delta, consultez Utiliser le flux des changements de données Delta Lake sur Azure Databricks.

Spécifications

Pour utiliser les API CDC, votre pipeline doit être configuré pour utiliser pipelines DLT serverless ou delta Live Tables Pro ou Advanced éditions.

Comment CDC est-elle implémentée avec le APPLY CHANGES API ?

En gérant automatiquement les enregistrements hors séquence, l’API APPLY CHANGES dans Delta Live Tables garantit le traitement correct des enregistrements CDC et supprime la nécessité de développer une logique complexe pour gérer les enregistrements hors séquence. Vous devez spécifier une colonne sur laquelle séquencer des enregistrements dans les données sources, que Delta Live Tables interprète comme une représentation monotone croissante de l’ordre correct des données sources. Delta Live Tables gère automatiquement les données qui arrivent dans le désordre. Pour les modifications SCD de type 2, Delta Live Tables propage les valeurs de séquençage appropriées aux tables __START_AT et __END_AT colonnes cibles. Il devrait y avoir une mise à jour distincte par clé pour chaque valeur de séquencement, et les valeurs de séquencement NULL ne sont pas prises en charge.

Pour effectuer un traitement CDC avec APPLY CHANGES, vous devez d’abord créer une table de flux, puis utiliser l’instruction SQL APPLY CHANGES INTO ou la fonction Python apply_changes() pour spécifier la source, les clés et le séquençage du flux de modifications. Pour créer la table de diffusion cible, utilisez l’instruction CREATE OR REFRESH STREAMING TABLE dans SQL ou la fonction create_streaming_table() dans Python. Consultez les exemples de traitement SCD de type 1 et de type 2.

Pour plus d’informations sur la syntaxe, consultez la référence Delta Live Tables SQL ou référence Python.

Comment CDC est-elle implémentée avec l’API APPLY CHANGES FROM SNAPSHOT ?

Important

L’API APPLY CHANGES FROM SNAPSHOT est en préversion publique.

APPLY CHANGES FROM SNAPSHOT est une API déclarative qui détermine efficacement les modifications apportées aux données sources en comparant une série d’instantanés dans l’ordre, puis exécute le traitement requis pour le traitement CDC des enregistrements dans les instantanés. APPLY CHANGES FROM SNAPSHOT n’est pris en charge que par l’interface Python de Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT prend en charge l’ingestion d’instantanés à partir de plusieurs types sources :

  • Utilisez l’ingestion périodique d’instantanés pour ingérer des instantanés à partir d’une table ou d’une vue existante. APPLY CHANGES FROM SNAPSHOT dispose d’une interface simple et simplifiée pour prendre en charge l’ingestion périodique d’instantanés à partir d’un objet de base de données existant. Un nouvel instantané est ingéré avec chaque mise à jour du pipeline et le temps d’ingestion est utilisé comme version de l’instantané. Lorsqu’un pipeline est exécuté en mode continu, plusieurs instantanés sont ingérés avec chaque mise à jour de pipeline sur une période déterminée par le paramètre de l’intervalle de déclencheur pour le flux qui contient le traitement APPLY CHANGES FROM SNAPSHOT.
  • Utilisez l’ingestion d’instantanés historiques pour traiter les fichiers contenant des instantanés de base de données, tels que les instantanés générés à partir d’une base de données Oracle ou MySQL ou d’un entrepôt de données.

Pour effectuer le traitement CDC à partir de n’importe quel type source avec APPLY CHANGES FROM SNAPSHOT, vous créez d’abord une table de diffusion en continu, puis utilisez la fonction apply_changes_from_snapshot() en Python pour spécifier l’instantané, les clés et d’autres arguments requis pour implémenter le traitement. Voir les exemples d’ingestion d’instantanés périodiques et d’ingestion d’instantanés historiques.

Les instantanés passés à l’API doivent être dans l’ordre croissant par version. Si Delta Live Tables détecte un instantané hors commande, une erreur est levée.

Pour plus de détails sur la syntaxe, voir la référence Python des tables dynamiques Delta.

Limites

La colonne utilisée pour le séquencement doit être un type de données triable.

Exemple : traitement SCD type 1 et SCD type 2 avec des données sources CDF

Les sections suivantes fournissent des exemples de requêtes SCD Delta Live Tables de type 1 et de type 2 qui mettent à jour les tables cibles sur la base d’événements source provenant d’un flux de données de modification :

  1. Créent des enregistrements d’utilisateur(-trice).
  2. Suppriment un enregistrement d’utilisateur(-trice).
  3. Mettent à jour les enregistrements d’utilisateur(-trice). Dans la méthode SCD de type 1, les dernières opérations UPDATE arrivent en retard et sont supprimées de la table cible, montrant ainsi la gestion des événements qui se produisent dans le désordre.

Les exemples suivants supposent que vous connaissez bien la configuration et la mise à jour des pipelines Delta Live Tables. Consulter Tutoriel : Exécuter votre premier pipeline Delta Live Tables.

Pour exécuter ces exemples, vous devez commencer par créer un exemple de jeu de données. Consulter Générer des données de test.

Voici les enregistrements d’entrée de ces exemples :

userId name city opération sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null Suppression 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Si vous supprimez les marques de commentaire de la ligne finale dans les exemples de données, l’enregistrement suivant qui spécifie où les enregistrements doivent être tronqués sera inséré :

userId name city opération sequenceNum
null null null TRUNCATE 3

Remarque

Tous les exemples suivants incluent des options permettant de spécifier les opérations DELETE et TRUNCATE , mais chacune est facultative.

Traiter les mises à jour de SCD de type 1

L’exemple suivant illustre le traitement des mises à jour de SCD de type 1 :

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Après exécution de l’exemple SCD de type 1, la table cible contient les enregistrements suivants :

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Après avoir exécuté l’exemple du type SCD 1 avec l’enregistrement TRUNCATE supplémentaire, les enregistrements 124 et 126 sont tronqués en raison de l’opération TRUNCATE sur sequenceNum=3, et la table cible contient l’enregistrement suivant :

userId name city
125 Mercedes Guadalajara

Traiter les mises à jour de SCD de type 2

L’exemple suivant illustre le traitement des mises à jour de SCD de type 2 :

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Après exécution de l’exemple SCD de type 2, la table cible contient les enregistrements suivants :

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 null

Une requête de type 2 SCD peut aussi indiquer un sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Les modifications apportées à d’autres colonnes sont mises à jour au lieu de générer de nouveaux enregistrements d’historique. L’exemple suivant illustre l’exclusion de la colonne city du suivi :

L’exemple suivant illustre l’utilisation de l’historique de suivi avec le SCD de type 2 :

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Après exécution de cet exemple sans l’enregistrement TRUNCATE supplémentaire, la table cible contient les enregistrements suivants :

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Générer des données de test

Le code ci-dessous est fourni pour générer un exemple de jeu de données à utiliser dans les exemples de requêtes présents dans ce tutoriel. En supposant que vous disposez des informations d’identification appropriées pour créer un schéma et créer une table, vous pouvez exécuter ces instructions avec un notebook ou Databricks SQL. Le code suivant n’est pas destiné à être exécuté dans le cadre d’un pipeline Delta Live Tables :

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Exemple : traitement périodique des instantanés

L’exemple suivant illustre le traitement SCD de type 2 qui ingère des instantanés d’une table stockée à mycatalog.myschema.mytable. Les résultats du traitement sont écrits dans une table nommée target.

mycatalog.myschema.mytable enregistrements au timestamp 2024-01-01 00:00:00

Clé Valeur
1 a1
2 a2

mycatalog.myschema.mytable enregistrements au timestamp 2024-01-01 12:00:00

Clé Valeur
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Après traitement des instantanés, la table cible contient les enregistrements suivants :

Clé Valeur __START_AT __END_AT
1 a1 01-01-2024 00:00:00 01-01-2024 12:00:00
2 a2 01-01-2024 00:00:00 01-01-2024 12:00:00
2 b2 01-01-2024 12:00:00 null
3 a3 01-01-2024 12:00:00 null

Exemple de : traitement d’instantanés historiques

L’exemple suivant illustre le traitement SCD de type 2 qui met à jour une table cible en fonction des événements sources de deux instantanés stockés dans un système de stockage cloud :

Instantanéà timestamp, stocké dans /<PATH>/filename1.csv

Clé TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Instantanéà timestamp + 5, stocké dans /<PATH>/filename2.csv

Clé TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

L’exemple de code suivant illustre le traitement des mises à jour de SCD de type 2 avec ces instantanés :

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Après traitement des instantanés, la table cible contient les enregistrements suivants :

Clé TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 null
3 a3 b3 2 null
4 a4 b4_new 1 null

Ajouter, modifier ou supprimer des données dans une table de streaming

Si votre pipeline publie des tables sur le catalogue Unity, vous pouvez utiliser des instructions de langage de manipulation de données (DML), notamment les instructions d’insertion, de mise à jour, de suppression et de fusion, pour modifier les tables de diffusion cible créées par des instructions APPLY CHANGES INTO.

Remarque

  • Les instructions DML qui modifient le schéma de table d’une table de streaming ne sont pas prises en charge. Assurez-vous que vos instructions DML ne tentent pas de faire évoluer le schéma de table.
  • Les instructions DML qui mettent à jour une table de streaming ne peuvent être exécutées que dans un cluster Unity Catalog partagé ou un entrepôt SQL à l’aide de Databricks Runtime 13.3 LTS et versions ultérieures.
  • Étant donné que le streaming nécessite des sources de données en ajout uniquement, si votre traitement nécessite le streaming à partir d'une table source de streaming avec des modifications (par exemple, via des instructions DML), définissez l'indicateur skipChangeCommits lors de la lecture de la table de streaming source. Lorsque skipChangeCommits est défini, les transactions qui suppriment ou modifient des enregistrements sur la table source sont ignorées. Si votre traitement ne nécessite pas de table de diffusion, vous pouvez utiliser une vue matérialisée (qui n’a pas la restriction d’ajout uniquement) comme table cible.

Étant donné que Delta Live Tables utilise une colonne SEQUENCE BY spécifiée et propage les valeurs de séquencement appropriées aux colonnes __START_AT et __END_AT de la table cible (pour le SCD de type 2), vous devez vous assurer que les instructions DML utilisent des valeurs valides pour ces colonnes afin de conserver l’ordre approprié des enregistrements. Voir Comment le CDC est-il mis en œuvre avec l’API APPLY CHANGES ?.

Pour plus d’informations sur l’utilisation d’instructions DML avec des tables de diffusion, consultez Ajouter, modifier ou supprimer des données dans une table de diffusion.

L’exemple suivant insère un enregistrement actif avec une séquence de début de 5 :

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Lire un flux de données modifiées à partir d’une table cible APPLY CHANGES

Dans Databricks Runtime 15.2 et versions ultérieures, vous pouvez lire un flux de données modifiées à partir d’une table de diffusion en continu qui est la cible de requêtes APPLY CHANGES ou APPLY CHANGES FROM SNAPSHOT comme vous lisez un flux de données de modifiées à partir d’autres tables Delta. Les éléments suivants sont nécessaires pour lire le flux de données modifiées à partir d’une table de diffusion en continu cible :

  • La table de diffusion en continu cible doit être publiée dans Unity Catalog. Consultez Utiliser Unity Catalog avec vos pipelines Delta Live Tables.
  • Pour lire le flux de données modifiées de la table de diffusion en continu cible, vous devez utiliser Databricks Runtime 15.2 ou version ultérieure. Pour lire le flux de données de modifiées dans un autre pipeline Delta Live Tables, le pipeline doit être configuré pour utiliser Databricks Runtime 15.2 ou version ultérieure.

Vous lisez le flux de données modifiées à partir d’une table de diffusion en continu cible créée dans un pipeline Delta Live Tables de la même façon que la lecture d’un flux de données modifiées à partir d’autres tables Delta. Pour en savoir plus sur l’utilisation de la fonctionnalité de flux de données modifiées Delta, notamment des exemples en Python et SQL, consultez Utiliser le flux de données modifiées Delta Lake sur Azure Databricks.

Remarque

L’enregistrement de flux de données modifiées inclut des métadonnées identifiant le type d’événement modifié. Lorsqu’un enregistrement est mis à jour dans une table, les métadonnées des enregistrements de modification associés comprennent généralement les valeurs _change_type définies sur les événements update_preimage et update_postimage.

Toutefois, les valeurs _change_type sont différentes si les mises à jour sont apportées à la table de diffusion en continu cible qui inclut la modification des valeurs de clé primaire. Lorsque les modifications incluent des mises à jour des clés primaires, les champs de métadonnées _change_type sont définis sur les événements insert et delete. Les modifications apportées aux clés primaires peuvent être effectuées lorsque des mises à jour manuelles sont apportées à l’un des champs de clé par le biais d’une instruction UPDATE ou MERGE ou encore, pour les tables SCD type 2, lorsque le champ __start_at change pour refléter une valeur de séquence de début antérieure.

La requête APPLY CHANGES détermine les valeurs de clé primaire, qui diffèrent pour le traitement SCD type 1 et SCD type 2 :

  • Pour le traitement SCD type 1 et l’interface Python Delta Live Tables, la clé primaire est la valeur du paramètre keys dans la fonction apply_changes(). Pour l’interface SQL Delta Live Tables, la clé primaire est les colonnes définies par la clause KEYS dans l’instruction APPLY CHANGES INTO.
  • Pour le SCD type 2, la clé primaire est le paramètre keys ou la clause KEYS plus la valeur renvoyée de l’opération coalesce(__START_AT, __END_AT), pour laquelle __START_AT et __END_AT sont les colonnes correspondantes de la table de diffusion en continu cible.

Obtenir des données sur les enregistrements traités par une requête CDC Delta Live Tables

Remarque

Les indicateurs suivants ne sont capturés que par APPLY CHANGES requêtes et non par APPLY CHANGES FROM SNAPSHOT requêtes.

Les métriques suivantes sont capturées par les requêtes APPLY CHANGES :

  • num_upserted_rows : nombre de lignes de sortie mises à jour/insérées dans le jeu de données.
  • num_deleted_rows : nombre de lignes de sortie existantes supprimées du jeu de données pendant une mise à jour.

L’indicateur num_output_rows, sortie pour les flux non CDC, n’est pas capturée pour les requêtes apply changes.

Quels objets de données sont utilisés pour le traitement CDC Delta Live Tables ?

Remarque : Les structures de données suivantes s’appliquent uniquement au traitement APPLY CHANGES, pas au traitement APPLY CHANGES FROM SNAPSHOT.

Lorsque vous déclarez la table cible dans le metastore Hive, deux structures de données sont créées :

  • Une vue utilisant le nom attribué à la table cible.
  • Une table de stockage interne utilisée par Delta Live Tables pour gérer le traitement CDC. Cette table est nommée en rajoutant __apply_changes_storage_ au début du nom de la table cible.

Par exemple, si vous déclarez une table cible nommée dlt_cdc_target, vous verrez une vue nommée dlt_cdc_target et une table nommée __apply_changes_storage_dlt_cdc_target dans le metastore. La création d’une vue permet à Delta Live Tables de filtrer les informations supplémentaires (par exemple, les objets tombstone et les versions) requises pour gérer les données désordonnées. Pour afficher les données traitées, interrogez la vue cible. Étant donné que le schéma de la table __apply_changes_storage_ peut changer pour prendre en charge les fonctionnalités ou améliorations futures, vous ne devez pas interroger la table pour une utilisation en production. Si vous ajoutez manuellement des données à la table, les enregistrements sont supposés venir avant d’autres modifications, car les colonnes de version sont manquantes.

Si un pipeline publie quelque chose dans le catalogue Unity, les tables de stockage internes ne sont pas accessibles aux utilisateurs.