Informations de référence sur le langage Python dans Delta Live Tables
Cet article contient des détails sur l’interface de programmation Python de Delta Live Tables.
Pour plus d’informations sur l’API SQL, consultez Informations de référence sur le langage SQL dans Delta Live Tables.
Pour plus de détails spécifiques à la configuration d’Auto Loader, voir Qu’est-ce qu’Auto Loader ?.
Avant de commencer
Les considérations suivantes sont importantes quand vous implémentez des pipelines avec l’interface Python de Delta Live Tables :
- Étant donné que les fonctions Python
table()
etview()
sont appelées plusieurs fois pendant la planification et l’exécution de la mise à jour d’un pipeline, n’incluez pas de code dans une de ces fonctions qui serait susceptible d’avoir des effets secondaires (par exemple, du code qui modifie des données ou envoie un e-mail). Pour éviter un comportement inattendu, vos fonctions Python qui définissent des jeux de données doivent inclure seulement le code nécessaire pour définir la table ou la vue. - Pour effectuer des opérations telles que l’envoi d’e-mails ou l’intégration à un service de surveillance externe, en particulier dans les fonctions qui définissent des jeux de données, utilisez des hooks d’événements. L’implémentation de ces opérations dans les fonctions qui définissent vos jeux de données va provoquer un comportement inattendu.
- Les fonctions Python
table
etview
doivent retourner un DataFrame. Certaines fonctions qui fonctionnent sur des DataFrames ne retournent pas de DataFrames et ne doivent pas être utilisées. Ces opérations incluent des fonctions telles quecollect()
,count()
,toPandas()
,save()
etsaveAsTable()
. Étant donné que les transformations DataFrame sont exécutées une fois le graphe de flux de données complet résolu, l’utilisation de telles opérations peut avoir des effets secondaires inattendus.
Importer le module Python dlt
Les fonctions Python Delta Live Tables sont définies dans le module dlt
. Vos pipelines implémentés avec l’API Python doivent importer ce module :
import dlt
Créer une vue matérialisée ou une table de streaming Delta Live Tables
En Python, Delta Live Tables détermine s'il convient de mettre à jour un ensemble de données en tant que vue matérialisée ou table de streaming en fonction de la requête de définition. Le décorateur @table
peut être utilisé pour définir à la fois des vues matérialisées et des tables de diffusion en continu.
Pour définir une vue matérialisée en Python, appliquez @table
à une requête qui effectue une lecture statique sur une source de données. Pour définir une table de diffusion en continu, appliquez @table
à une requête qui effectue une lecture en continu sur une source de données ou utilisez la fonction create_streaming_table(). Les deux types d’ensembles de données ont la même spécification de syntaxe comme suit :
Remarque
Pour utiliser l’argument cluster_by
afin d’activer le clustering liquide, votre pipeline doit être configuré pour utiliser la chaîne de prévisualisation.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Créer une vue Delta Live Tables
Pour définir une vue dans Python, appliquez l'élément décoratif @view
. Comme le décorateur @table
, vous pouvez utiliser des vues dans Delta Live Tables pour des ensembles de données statiques ou en streaming. Voici la syntaxe pour définir des vues avec Python :
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Exemple : Définir des tables et des vues
Pour définir une table ou une vue en Python, appliquez le décorateur @dlt.view
ou @dlt.table
à une fonction. Vous pouvez utiliser le nom de la fonction ou le paramètre name
pour attribuer le nom de la table ou de la vue. L’exemple suivant définit deux jeux de données différents : une vue appelée taxi_raw
qui prend un fichier JSON comme source d’entrée, et une table appelée filtered_data
qui prend la vue taxi_raw
comme entrée :
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return spark.read.table("LIVE.taxi_raw").where(...)
Exemple : accéder à un ensemble de données défini dans le même pipeline
Remarque
Bien que les dlt.read()
fonctions et dlt.read_stream()
les fonctions soient toujours disponibles et entièrement prises en charge par l’interface Python Delta Live Tables, Databricks recommande toujours d’utiliser les fonctions et spark.readStream.table()
les spark.read.table()
fonctions en raison des éléments suivants :
- Les
spark
fonctions prennent en charge la lecture de jeux de données internes et externes, y compris les jeux de données dans le stockage externe ou définis dans d’autres pipelines. Lesdlt
fonctions prennent uniquement en charge la lecture de jeux de données internes. - Les
spark
fonctions prennent en charge la spécification d’options, telles queskipChangeCommits
, pour lire les opérations. La spécification des options n’est pas prise en charge par lesdlt
fonctions.
Pour accéder à un jeu de données défini dans le même pipeline, utilisez le ou spark.readStream.table()
les spark.read.table()
fonctions, en préparant le LIVE
mot clé au nom du jeu de données :
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return spark.read.table("LIVE.customers_raw").where(...)
Exemple : lecture à partir d'une table enregistrée dans un métastore
Pour lire les données d'une table enregistrée dans le metastore Hive, dans l'argument de la fonction, omettez le mot-clé LIVE
et qualifiez éventuellement le nom de la table avec le nom de la base de données :
@dlt.table
def customers():
return spark.read.table("sales.customers").where(...)
Pour obtenir un exemple de lecture à partir d'une table Unity Catalog, consultez Ingérer des données dans un pipeline Unity Catalog.
Exemple : Accéder à un jeu de données à l’aide de spark.sql
Vous pouvez également retourner un jeu de données à l’aide d’une expression spark.sql
dans une fonction de requête. Pour lire dans un jeu de données interne, ajoutez LIVE.
au début le nom du jeu de données :
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Créer une table à utiliser comme cible des opérations de streaming
Utilisez la fonction create_streaming_table()
pour créer une table cible pour la sortie des enregistrements par les opérations de diffusion en continu, y compris les enregistrements de sortie apply_changes(), apply_changes_from_snapshot() et @append_flow.
Remarque
Les fonctions create_target_table()
et create_streaming_live_table()
sont obsolètes. Databricks recommande de mettre à jour le code existant pour utiliser la fonction create_streaming_table()
.
Remarque
Pour utiliser l’argument cluster_by
afin d’activer le clustering liquide, votre pipeline doit être configuré pour utiliser la chaîne de prévisualisation.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Arguments |
---|
name Entrez : str Nom de la table. Ce paramètre est obligatoire. |
comment Entrez : str Description facultative de la table. |
spark_conf Entrez : dict Liste facultative de configurations Spark pour l’exécution de cette requête. |
table_properties Entrez : dict Liste facultative des propriétés de table disponibles pour la table. |
partition_cols Entrez : array Liste facultative d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table. |
cluster_by Entrez : array Activez éventuellement le clustering liquide sur la table et définissez les colonnes à utiliser comme clés de clustering. Consultez Utilisation des clustering liquides pour les tableaux Delta. |
path Entrez : str Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline. |
schema Type : str ou StructType Définition de schéma facultative pour la table. Les schémas peuvent être définis en tant que chaîne SQL DDL ou avec Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Entrez : dict Contraintes facultative de qualité des données pour la table. Consultez attentes multiples. |
row_filter (Préversion publique)Entrez : str Clause de filtre de ligne facultative pour la table. Consultez Publier des tables avec des filtres de lignes et des masques de colonne. |
Contrôler la façon dont les tables sont matérialisées
Les tables offrent également un contrôle supplémentaire de leur matérialisation :
- Avec la propriété
partition_cols
, indiquez de quelle manière les tables sont partitionnées. Le partitionnement vous permet d’accélérer les requêtes. - Vous pouvez définir des propriétés de table lorsque vous définissez une vue ou une table. Voir Propriétés des tables Delta Live Tables.
- Définissez un emplacement de stockage pour les données de table à l’aide du paramètre
path
. Par défaut, les données des tables sont stockées dans l’emplacement de stockage du pipeline sipath
n’est pas défini. - Vous pouvez utiliser des colonnes générées dans votre définition de schéma. Voir Exemple : Spécifier un schéma et des colonnes de partition.
Remarque
Pour les tables d’une taille inférieure à 1 To, Databricks recommande de laisser Delta Live Tables contrôler l’organisation des données. Vous ne devez pas spécifier de colonnes de partition, sauf si vous pensez que votre table dépassera un téraoctet.
Exemple : spécifier un schéma et des colonnes de partition
Vous pouvez éventuellement spécifier un schéma de table à l’aide d’une chaîne StructType
Python ou SQL DDL. Quand elle est spécifiée avec une chaîne DDL, la définition peut inclure des colonnes générées.
L'exemple suivant crée une table appelée sales
avec un schéma spécifié à l'aide de Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
L'exemple suivant spécifie le schéma d'une table à l'aide d'une chaîne DDL, définit une colonne générée et définit une colonne de partition :
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Par défaut, Delta Live Tables déduit le schéma de la définition table
si vous ne spécifiez pas de schéma.
Configurer une table de streaming pour ignorer les modifications dans une table de streaming source
Remarque
- Le drapeau
skipChangeCommits
ne fonctionne qu'avecspark.readStream
en utilisant la fonctionoption()
. Vous ne pouvez pas utiliser cet indicateur dans une fonctiondlt.read_stream()
. - Vous ne pouvez pas utiliser l’indicateur
skipChangeCommits
lorsque la table de streaming source est définie comme cible d’une fonction apply_changes().
Par défaut, les tables de streaming nécessitent des sources en ajout uniquement. Lorsqu'une table de streaming utilise une autre table de streaming comme source et que la table de streaming source nécessite des mises à jour ou des suppressions, par exemple, le traitement du « droit à l'oubli » RGPD, l'indicateur skipChangeCommits
peut être défini lors de la lecture de la table de streaming pour ignorer ces modifications. Pour plus d’informations sur cet indicateur, consultez Ignorer les mises à jour et les suppressions.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Exemple : définir des contraintes de table
Important
Les contraintes de table sont en préversion publique.
Lorsque vous spécifiez un schéma, vous pouvez définir des clés primaires et étrangères. Les contraintes sont informationnelles et ne sont pas appliquées. Consultez la clause CONSTRAINT dans les informations de référence sur le langage SQL.
L’exemple suivant définit une table avec une contrainte de clé primaire et étrangère :
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Exemple : Définir un filtre de lignes et un masque de colonne
Important
Les filtres de lignes et les masques de colonne sont en préversion publique.
Pour créer une vue matérialisée ou une table streaming avec un filtre de lignes et un masque de colonne, utilisez la clause ROW FILTER et la clause MASK. L’exemple suivant montre comment définir une vue matérialisée et une table streaming avec un filtre de ligne et un masque de colonne :
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Pour plus d’informations sur les filtres de lignes et les masques de colonnes, consultez Publier des tables avec des filtres de lignes et des masques de colonnes.
Propriétés des tables dynamiques Python Delta
Les tableaux suivants décrivent les options et propriétés que vous pouvez spécifier lors de la définition de tables et de vues avec Delta Live Tables :
Remarque
Pour utiliser l’argument cluster_by
afin d’activer le clustering liquide, votre pipeline doit être configuré pour utiliser la chaîne de prévisualisation.
@table ou @view |
---|
name Entrez : str Nom facultatif pour la table ou la vue. S’il n’est pas défini, le nom de la fonction est utilisé comme nom de la table ou de la vue. |
comment Entrez : str Description facultative de la table. |
spark_conf Entrez : dict Liste facultative de configurations Spark pour l’exécution de cette requête. |
table_properties Entrez : dict Liste facultative des propriétés de table disponibles pour la table. |
path Entrez : str Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline. |
partition_cols Entrez : a collection of str Collection facultative, comme list d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table. |
cluster_by Entrez : array Activez éventuellement le clustering liquide sur la table et définissez les colonnes à utiliser comme clés de clustering. Consultez Utilisation des clustering liquides pour les tableaux Delta. |
schema Type : str ou StructType Définition de schéma facultative pour la table. Les schémas peuvent être définis en tant que chaîne SQL DDL ou avec StructType Python. |
temporary Entrez : bool Créez une table, mais ne publiez pas de métadonnées pour la table. Le mot clé temporary indique à Delta Live Tables de créer une table qui est disponible pour le pipeline, mais qui ne doit pas être accessible en dehors du pipeline. Pour réduire le temps de traitement, une table temporaire persiste pendant la durée de vie du pipeline qui la crée, pas uniquement pour une seule mise à jour.La valeur par défaut est False. |
row_filter (Préversion publique)Entrez : str Clause de filtre de ligne facultative pour la table. Consultez Publier des tables avec des filtres de lignes et des masques de colonne. |
Définition de la table ou de la vue |
---|
def <function-name>() Fonction Python qui définit le jeu de données. Si le paramètre name n’est pas défini, <function-name> est utilisé comme nom du jeu de données cible. |
query Instruction SQL Spark qui retourne un jeu de données Spark ou un DataFrame Koalas. Utilisez dlt.read() ou spark.read.table() pour effectuer une lecture complète à partir d’un jeu de données défini dans le même pipeline. Pour lire un jeu de données externe, utilisez la spark.read.table() fonction. Vous ne pouvez pas utiliser dlt.read() pour lire des jeux de données externes. Étant donné que spark.read.table() vous pouvez utiliser pour lire des jeux de données internes, des jeux de données définis en dehors du pipeline actuel et vous permet de spécifier des options de lecture de données, Databricks recommande de l’utiliser au lieu de la dlt.read() fonction.Lorsque vous utilisez la spark.read.table() fonction pour lire à partir d’un jeu de données défini dans le même pipeline, ajoutez le LIVE mot clé au nom du jeu de données dans l’argument de fonction. Par exemple, pour lire dans un jeu de données nommé customers :spark.read.table("LIVE.customers") Vous pouvez également utiliser la fonction spark.read.table() pour lire dans une table inscrite dans le metastore en omettant le mot clé LIVE et en qualifiant éventuellement le nom de la table avec le nom de la base de données :spark.read.table("sales.customers") Utilisez ou spark.readStream.table() effectuez dlt.read_stream() une lecture en continu à partir d’un jeu de données défini dans le même pipeline. Pour effectuer une lecture en continu à partir d’un jeu de données externe, utilisez lespark.readStream.table() fonction. Étant donné que spark.readStream.table() vous pouvez utiliser pour lire des jeux de données internes, des jeux de données définis en dehors du pipeline actuel et vous permet de spécifier des options de lecture de données, Databricks recommande de l’utiliser au lieu de la dlt.read_stream() fonction.Pour définir une requête dans une fonction Delta Live Tables table à l’aide de la syntaxe SQL, utilisez la spark.sql fonction. Consultez l’exemple : Accéder à un jeu de données à l’aide de spark.sql. Pour définir une requête dans une fonction Delta Live Tables table à l’aide de Python, utilisez la syntaxe PySpark . |
Attentes |
---|
@expect("description", "constraint") Déclarez une contrainte de qualité des données identifiée par description . Si une ligne enfreint l’attente, incluez la ligne dans le jeu de données cible. |
@expect_or_drop("description", "constraint") Déclarez une contrainte de qualité des données identifiée par description . Si une ligne enfreint l’attente, supprimez la ligne du jeu de données cible. |
@expect_or_fail("description", "constraint") Déclarez une contrainte de qualité des données identifiée par description . Si une ligne enfreint l’attente, arrêtez immédiatement l’exécution. |
@expect_all(expectations) Déclarez une ou plusieurs contraintes de qualité des données. expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, incluez la ligne dans le jeu de données cible. |
@expect_all_or_drop(expectations) Déclarez une ou plusieurs contraintes de qualité des données. expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, supprimez la ligne du jeu de données cible. |
@expect_all_or_fail(expectations) Déclarez une ou plusieurs contraintes de qualité des données. expectations est un dictionnaire Python, dans lequel la clé est la description de l’attente et la valeur est la contrainte de l’attente. Si une ligne enfreint une des attentes, arrêtez immédiatement l’exécution. |
Modifier la capture de données d’un flux de modification avec Python dans Delta Live Tables
Utilisez la fonction apply_changes()
dans l’API Python pour utiliser la fonctionnalité de capture des changements de données (CDC) Delta Live Tables pour traiter les données sources à partir d’un flux de changements de données (CDF).
Important
Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible apply_changes()
, vous devez inclure les colonnes __START_AT
et __END_AT
avec le même type de données que les champs sequence_by
.
Pour créer la table cible requise, vous pouvez utiliser la fonction create_streaming_table() dans l’interface Python Delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Remarque
Pour le traitement de APPLY CHANGES
, le comportement par défaut des évènements INSERT
et UPDATE
consiste à effectuer un upsert des évènements CDC depuis la source : mettre à jour les lignes de la table cible qui correspondent aux clés spécifiées, ou insérer une nouvelle ligne quand un enregistrement correspondant n’existe pas dans la table cible. La gestion des événements DELETE
peut être spécifiée avec la condition APPLY AS DELETE WHEN
.
Pour en savoir plus sur le traitement CDC avec un flux de changements, consultez API APPLY CHANGES : Simplifiez la capture des changements de données avec Delta Live Tables. Pour obtenir un exemple de l’utilisation de la fonction apply_changes()
, consultez Exemple : traitement SCD type 1 et SCD type 2 avec des données sources CDF.
Important
Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible apply_changes
, vous devez inclure les colonnes __START_AT
et __END_AT
avec le même type de données que le champ sequence_by
.
Consultez API APPLY CHANGES : Simplifier la capture des changements de données avec Delta Live Tables.
Arguments |
---|
target Entrez : str Nom de la table à mettre à jour. Vous pouvez utiliser la fonction create_streaming_table() pour créer la table cible avant d'exécuter la fonction apply_changes() .Ce paramètre est obligatoire. |
source Entrez : str Source de données contenant les enregistrements de capture des changements de données. Ce paramètre est obligatoire. |
keys Entrez : list Colonne ou combinaison de colonnes identifiant de façon unique une ligne dans les données sources. Utilisée pour identifier les événements de capture des changements de données qui s’appliquent à des enregistrements spécifiques dans la table cible. Vous pouvez spécifier l’un des éléments suivants : - Liste de chaînes : ["userId", "orderId"] - Liste de fonctions col() Spark SQL : [col("userId"), col("orderId"] Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId) , mais pas col(source.userId) .Ce paramètre est obligatoire. |
sequence_by Type : str ou col() Nom de colonne spécifiant l’ordre logique des événements de capture des changements de données dans les données sources. Delta Live Tables utilise ce séquencement pour gérer les événements de modification qui se produisent dans le désordre. Vous pouvez spécifier l’un des éléments suivants : - Chaîne : "sequenceNum" - Fonction col() Spark SQL : col("sequenceNum") Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId) , mais pas col(source.userId) .La colonne spécifiée doit être un type de données triable. Ce paramètre est obligatoire. |
ignore_null_updates Entrez : bool Autoriser l’ingestion des mises à jour contenant un sous-ensemble des colonnes cibles. Quand un évènement CDC correspond à une ligne existante et que ignore_null_updates a la valeur True , les colonnes avec null conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec une valeur de null . Quand ignore_null_updates a la valeur False , les valeurs existantes sont remplacées par des valeurs null .Ce paramètre est facultatif. Par défaut, il s’agit de False . |
apply_as_deletes Type : str ou expr() Spécifie quand un événement de capture des changements de données doit être traité en tant qu’opération DELETE plutôt qu’opération upsert. Pour gérer des données non ordonnées, la ligne supprimée est conservée temporairement en tant qu’objet tombstone dans la table Delta sous-jacente, et un affichage est créé dans le metastore, qui filtre ces objets tombstone. Vous pouvez configurer l’intervalle de conservation avec lapipelines.cdc.tombstoneGCThresholdInSeconds propriété table.Vous pouvez spécifier l’un des éléments suivants : - Chaîne : "Operation = 'DELETE'" - Fonction expr() Spark SQL : expr("Operation = 'DELETE'") Ce paramètre est facultatif. |
apply_as_truncates Type : str ou expr() Spécifie quand un événement de capture des changements de données doit être traité en tant que TRUNCATE de table complet. Étant donné que cette clause déclenche une troncation complète de la table cible, elle doit être utilisée uniquement pour des cas d’usage spécifiques nécessitant cette fonctionnalité.Le paramètre apply_as_truncates est pris en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge les opérations de troncation.Vous pouvez spécifier l’un des éléments suivants : - Chaîne : "Operation = 'TRUNCATE'" - Fonction expr() Spark SQL : expr("Operation = 'TRUNCATE'") Ce paramètre est facultatif. |
column_list except_column_list Entrez : list Sous-ensemble de colonnes à inclure dans la table cible. Utilisez column_list pour spécifier la liste complète des colonnes à inclure. Utilisez except_column_list pour spécifier les colonnes à exclure. Vous pouvez déclarer les valeurs comme une liste de chaînes ou en tant que fonctions col() Spark SQL :- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId) , mais pas col(source.userId) .Ce paramètre est facultatif. Par défaut toutes les colonnes de la table cible sont incluses quand aucun argument column_list ou except_column_list n’est passé à la fonction. |
stored_as_scd_type Type : str ou int Indique s’il faut stocker des enregistrements en tant que méthode SCD de type 1 ou méthode SCD de type 2. Défini sur 1 pour la méthode SCD de type 1 ou 2 pour la méthode SCD de type 2.Cette clause est facultative. La valeur par défaut est la méthode SCD de type 1. |
track_history_column_list track_history_except_column_list Entrez : list Sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Utilisez track_history_column_list pour spécifier la liste complète des colonnes à suivre. Utilisationtrack_history_except_column_list pour spécifier les colonnes à exclure du suivi. Vous pouvez déclarer les valeurs comme une liste de chaînes ou en tant que fonctions col() Spark SQL :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId) , mais pas col(source.userId) .Ce paramètre est facultatif. La valeur par défaut est d'inclure toutes les colonnes de la table cible lorsque aucune track_history_column_list ouUn argument track_history_except_column_list est transmis à la fonction. |
Capture des changements de données à partir d’instantanés de base de données avec Python dans Delta Live Tables
Important
L’API APPLY CHANGES FROM SNAPSHOT
est en préversion publique.
Utilisez la fonction apply_changes_from_snapshot()
dans l’API Python pour utiliser la fonctionnalité de capture des changements de données (CDC) Delta Live Tables pour traiter les données sources à partir d’instantanés de base de données.
Important
Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible apply_changes_from_snapshot()
, vous devez également inclure les colonnes __START_AT
et __END_AT
avec le même type de données que le champ sequence_by
.
Pour créer la table cible requise, vous pouvez utiliser la fonction create_streaming_table() dans l’interface Python Delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Remarque
Pour le traitement de APPLY CHANGES FROM SNAPSHOT
, le comportement par défaut consiste à insérer une nouvelle ligne lorsqu’un enregistrement correspondant avec la ou les mêmes clés n’existe pas dans la cible. Si un enregistrement correspondant existe, il est mis à jour uniquement si l’une quelconque des valeurs de la ligne a changé. Les lignes avec des clés présentes dans la cible, mais qui ne sont plus présentes dans la source, sont supprimées.
Pour en savoir plus sur le traitement CDC avec des instantanés, consultez API APPLY CHANGES : Simplifiez la capture des changements de données avec Delta Live Tables. Pour des exemples d’utilisation de la fonction apply_changes_from_snapshot()
, consultez les exemples ingestion d’instantané périodique et ingestion d’instantané historique.
Arguments |
---|
target Entrez : str Nom de la table à mettre à jour. Vous pouvez utiliser la fonction create_streaming_table() pour créer la table cible avant d'exécuter la fonction apply_changes() .Ce paramètre est obligatoire. |
source Type : str ou lambda function Nom d’une table ou d’une vue pour réaliser périodiquement un instantané ou d’une fonction lambda Python qui retourne le DataFrame de l’instantané à traiter et la version de l’instantané. Consultez Implémenter l’argument source. Ce paramètre est obligatoire. |
keys Entrez : list Colonne ou combinaison de colonnes identifiant de façon unique une ligne dans les données sources. Utilisée pour identifier les événements de capture des changements de données qui s’appliquent à des enregistrements spécifiques dans la table cible. Vous pouvez spécifier l’un des éléments suivants : - Liste de chaînes : ["userId", "orderId"] - Liste de fonctions col() Spark SQL : [col("userId"), col("orderId"] Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId) , mais pas col(source.userId) .Ce paramètre est obligatoire. |
stored_as_scd_type Type : str ou int Indique s’il faut stocker des enregistrements en tant que méthode SCD de type 1 ou méthode SCD de type 2. Défini sur 1 pour la méthode SCD de type 1 ou 2 pour la méthode SCD de type 2.Cette clause est facultative. La valeur par défaut est la méthode SCD de type 1. |
track_history_column_list track_history_except_column_list Entrez : list Sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Utilisez track_history_column_list pour spécifier la liste complète des colonnes à suivre. Utilisationtrack_history_except_column_list pour spécifier les colonnes à exclure du suivi. Vous pouvez déclarer les valeurs comme une liste de chaînes ou en tant que fonctions col() Spark SQL :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Les arguments pour les fonctions col() ne peuvent pas inclure de qualificateurs. Par exemple, vous pouvez utiliser col(userId) , mais pas col(source.userId) .Ce paramètre est facultatif. La valeur par défaut est d'inclure toutes les colonnes de la table cible lorsque aucune track_history_column_list ouUn argument track_history_except_column_list est transmis à la fonction. |
Implémenter l’argument source
La fonction apply_changes_from_snapshot()
inclut l’argument source
. Pour le traitement des instantanés historiques, l’argument source
est censé être une fonction lambda Python qui retourne deux valeurs à la fonction apply_changes_from_snapshot()
: un DataFrame Python contenant les données de l’instantané à traiter et une version de l’instantané.
Voici la signature de la fonction lambda :
lambda Any => Optional[(DataFrame, Any)]
- L’argument de la fonction lambda est la version de l’instantanée traitée le plus récemment.
- La valeur de retour de la fonction lambda est
None
ou un tuple de deux valeurs : la première valeur du tuple est un DataFrame contenant l’instantané à traiter. La deuxième valeur du tuple est la version de l’instantané qui représente l’ordre logique de l’instantané.
Exemple qui implémente et appelle la fonction lambda :
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Le runtime Delta Live Tables effectue les étapes suivantes chaque fois que le pipeline qui contient la fonction apply_changes_from_snapshot()
est déclenché :
- Exécute la fonction
next_snapshot_and_version
pour charger le DataFrame de l’instantané suivant et la version de l’instantané correspondante. - Si aucun DataFrame n’est retourné, l’exécution est terminée et la mise à jour du pipeline est marquée comme terminée.
- Détecte les changements apportés au nouvel instantané et les applique de manière incrémentielle à la table cible.
- Retourne à l’étape 1 pour charger l’instantané suivant et sa version.
Limitations
L’interface Python de Delta Live Tables a les limitations suivantes :
La fonction pivot()
n’est pas prise en charge. L’opération pivot
dans Spark nécessite le chargement hâtif des données d’entrée pour calculer le schéma de sortie. Cette fonctionnalité n’est pas prise en charge dans Delta Live Tables.