Partager via


Informations de référence sur le langage SQL dans Delta Live Tables

Cet article contient des détails sur l’interface de programmation SQL de Delta Live Tables.

Vous pouvez utiliser des fonctions définies par l’utilisateur (UDF) Python dans vos requêtes SQL, mais vous devez définir ces fonctions UDF dans les fichiers Python avant de les appeler dans les fichiers sources SQL. Consultez Fonctions scalaires définies par l'utilisateur – Python.

Limites

La clause PIVOT n'est pas prise en charge. L’opération pivot dans Spark nécessite le chargement hâtif des données d’entrée pour calculer le schéma de sortie. Cette fonctionnalité n’est pas prise en charge dans Delta Live Tables.

Créer une vue matérialisée ou une table de diffusion en continu Delta Live Tables

Remarque

  • La syntaxe CREATE OR REFRESH LIVE TABLE pour créer une vue matérialisée est déconseillée. Utilisez plutôt CREATE OR REFRESH MATERIALIZED VIEW.
  • Pour utiliser la clause CLUSTER BY afin d’activer le clustering liquide, votre pipeline doit être configuré pour utiliser la chaîne d’aperçu.

Vous utilisez la même syntaxe SQL de base lors de la déclaration d’une table de diffusion en continu ou d’une vue matérialisée.

Déclarer une vue matérialisée Delta Live Tables avec SQL

L’exemple suivant décrit la syntaxe permettant de déclarer une vue matérialisée dans les tables dynamiques Delta avec SQL :

CREATE OR REFRESH MATERIALIZED VIEW view_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Déclarer une table de streaming Delta Live Tables avec SQL

Vous pouvez uniquement déclarer des tables de diffusion en continu à l’aide de requêtes qui lisent sur une source de diffusion en continu. Databricks recommande d’utiliser Auto Loader pour l’ingestion d’une diffusion en continu de fichiers à partir du stockage d’objets cloud. Consultez Syntaxe SQL Auto Loader.

Lorsque vous spécifiez d’autres tables ou vues dans votre pipeline en tant que sources de diffusion en continu, vous devez inclure la fonction STREAM() autour du nom du jeu de données.

L’exemple suivant décrit la syntaxe de déclaration d’une table streaming dans Delta Live Tables avec SQL :

CREATE OR REFRESH [TEMPORARY] STREAMING TABLE table_name [CLUSTER BY (col_name1, col_name2, ... )]
  [(
    [
    col_name1 col_type1 [ GENERATED ALWAYS AS generation_expression1 ] [ COMMENT col_comment1 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    col_name2 col_type2 [ GENERATED ALWAYS AS generation_expression2 ] [ COMMENT col_comment2 ] [ column_constraint ] [ MASK func_name [ USING COLUMNS ( other_column_name | constant_literal [, ...] ) ] ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
    [ table_constraint ] [, ...]
  )]
  [USING DELTA]
  [PARTITIONED BY (col_name1, col_name2, ... )]
  [LOCATION path]
  [COMMENT table_comment]
  [TBLPROPERTIES (key1 [ = ] val1, key2 [ = ] val2, ... )]
  [ WITH { ROW FILTER func_name ON ( [ column_name | constant_literal [, ...] ] ) [...] } ]
  AS select_statement

Créer une vue Delta Live Tables

Ce qui suit décrit la syntaxe pour déclarer des vues dans SQL :

CREATE TEMPORARY [STREAMING] LIVE VIEW view_name
  [(
    [
    col_name1 [ COMMENT col_comment1 ],
    col_name2 [ COMMENT col_comment2 ],
    ...
    ]
    [
    CONSTRAINT expectation_name_1 EXPECT (expectation_expr1) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    CONSTRAINT expectation_name_2 EXPECT (expectation_expr2) [ON VIOLATION { FAIL UPDATE | DROP ROW }],
    ...
    ]
  )]
  [COMMENT view_comment]
  AS select_statement

Syntaxe SQL Auto Loader

Vous trouverez ci-après une description de la syntaxe permettant d’utiliser Auto Loader dans SQL :

CREATE OR REFRESH STREAMING TABLE table_name
AS SELECT *
  FROM read_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

Vous pouvez utiliser des options de format prises en charge avec Auto Loader. À l’aide de la fonction map(), vous pouvez transmettre des options à la méthode read_files(). Les options sont des paires clé-valeur, où les clés et les valeurs sont des chaînes. Pour plus d’informations sur les formats et options de prise en charge, consultez Options de format de fichier.

Exemple : définir des tables

Vous pouvez créer un jeu de données en lisant les données à partir d’une source de données externe ou des jeux de données définis dans un pipeline. Pour lire dans un jeu de données interne, ajoutez le mot clé LIVE au début le nom du jeu de données. L’exemple suivant définit deux jeux de données différents : une table appelée taxi_raw qui prend un fichier JSON comme source d’entrée et une table appelée filtered_data qui prend la table taxi_raw comme entrée :

CREATE OR REFRESH MATERIALIZED VIEW taxi_raw
AS SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`

CREATE OR REFRESH MATERIALIZED VIEW filtered_data
AS SELECT
  ...
FROM LIVE.taxi_raw

Exemple : lecture à partir d’une source de diffusion en continu

Pour lire des données à partir d’une source de streaming, par exemple Auto Loader ou un jeu de données interne, définissez une table STREAMING :

CREATE OR REFRESH STREAMING TABLE customers_bronze
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE customers_silver
AS SELECT * FROM STREAM(LIVE.customers_bronze)

Pour plus d’informations sur la diffusion en continu de données, consultez Transformer les données avec Delta Live Tables.

Contrôler la façon dont les tables sont matérialisées

Les tables offrent également un contrôle supplémentaire de leur matérialisation :

Remarque

Pour les tables d’une taille inférieure à 1 To, Databricks recommande de laisser Delta Live Tables contrôler l’organisation des données. Si vous ne prévoyez pas de faire croître votre table de plus d’un téra-octet, Databricks vous recommande de ne pas spécifier de colonnes de partition.

Exemple : spécifier un schéma et des colonnes de partition

Vous pouvez éventuellement spécifier un schéma lorsque vous définissez une table. L’exemple suivant spécifie le schéma de la table cible, notamment l’utilisation de colonnes générées par Delta Lake et la définition de colonnes de partition pour la table :

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
) PARTITIONED BY (order_day_of_week)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Par défaut, Delta Live Tables déduit le schéma de la définition table si vous ne spécifiez pas de schéma.

Exemple : définir des contraintes de table

Remarque

La prise en charge de Delta Live Tables pour les contraintes de table est disponible en préversion publique. Pour définir des contraintes de table, votre pipeline doit être compatible avec Unity Catalog et configuré pour utiliser le canal preview.

Lorsque vous spécifiez un schéma, vous pouvez définir des clés primaires et étrangères. Les contraintes sont informationnelles et ne sont pas appliquées. Consultez la clause CONSTRAINT dans les informations de référence sur le langage SQL.

L’exemple suivant définit une table avec une contrainte de clé primaire et étrangère :

CREATE OR REFRESH MATERIALIZED VIEW sales
(customer_id STRING NOT NULL PRIMARY KEY,
  customer_name STRING,
  number_of_line_items STRING,
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
  CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
)
COMMENT "Raw data on sales"
AS SELECT * FROM ...

Paramétrer les valeurs utilisées lors de la déclaration de tables ou de vues avec SQL

Utilisez SET pour spécifier une valeur de configuration dans une requête qui déclare une table ou une vue, y compris des configurations Spark. Toute table ou vue que vous définissez dans un notebook après l’instruction SET a accès à la valeur définie. Toutes les configurations Spark spécifiées avec l’instruction SET sont appliquées lors de l’exécution de la requête Spark sur une table ou une vue définie à la suite de l’instruction SET. Pour lire une valeur de configuration dans une requête, utilisez la syntaxe d’interpolation de chaîne ${}. L’exemple suivant définit une valeur de configuration Spark nommée startDate, puis utilise cette valeur dans une requête :

SET startDate='2020-01-01';

CREATE OR REFRESH MATERIALIZED VIEW filtered
AS SELECT * FROM src
WHERE date > ${startDate}

Pour spécifier plusieurs valeurs de configuration, utilisez une instruction SET distincte pour chaque valeur.

Exemple : définition d’un filtre de lignes et d’un masque de colonnes

Important

Les filtres de lignes et les masques de colonne sont en préversion publique.

Pour créer une vue matérialisée ou une table streaming avec un filtre de lignes et un masque de colonne, utilisez la clause ROW FILTER et la clause MASK. L’exemple suivant montre comment définir une vue matérialisée et une table streaming avec un filtre de ligne et un masque de colonne :

CREATE OR REFRESH STREAMING TABLE customers_silver (
  id int COMMENT 'This is the customer ID',
  name string,
  region string,
  ssn string MASK catalog.schema.ssn_mask_fn COMMENT 'SSN masked for privacy'
)
WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
AS SELECT * FROM STREAM(LIVE.customers_bronze)

CREATE OR REFRESH MATERIALIZED VIEW sales (
  customer_id STRING MASK catalog.schema.customer_id_mask_fn,
  customer_name STRING,
  number_of_line_items STRING COMMENT 'Number of items in the order',
  order_datetime STRING,
  order_number LONG,
  order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
)
COMMENT "Raw data on sales"
WITH ROW FILTER catalog.schema.order_number_filter_fn ON (order_number)
AS SELECT * FROM LIVE.sales_bronze

Pour plus d’informations sur les filtres de lignes et les masques de colonnes, consultez Publier des tables avec des filtres de lignes et des masques de colonnes.

Propriétés SQL

Remarque

Pour utiliser la clause CLUSTER BY afin d’activer le clustering liquide, votre pipeline doit être configuré pour utiliser la chaîne d’aperçu.

CREATE TABLE ou VIEW
TEMPORARY

Créez une table mais ne publiez pas de métadonnées pour la table. La clause TEMPORARY indique à Delta Live Tables de créer une table disponible pour le pipeline, mais qui ne doit pas être accessible en dehors du pipeline. Pour réduire le temps de traitement, une table temporaire persiste pendant la durée de vie du pipeline qui la crée, pas uniquement pour une seule mise à jour.
STREAMING

Crée une table qui lit un jeu de données d’entrée en tant que flux. Le jeu de données d’entrée doit être une source de données en streaming, par exemple Auto Loader ou une table STREAMING.
CLUSTER BY

Activez le clustering liquide sur la table et définissez les colonnes à utiliser comme clés de clustering.

Consultez Utilisation des clustering liquides pour les tableaux Delta.
PARTITIONED BY

Liste facultative d’une ou de plusieurs colonnes à utiliser pour le partitionnement de la table.
LOCATION

Emplacement de stockage facultatif pour les données de la table. S’il n’est pas défini, le système utilise par défaut l’emplacement de stockage du pipeline.
COMMENT

Description facultative de la table.
column_constraint

Une contrainte de clé primaire ou de clé étrangère facultative et informationnelle sur la colonne.
MASK clause (Préversion publique)

Permet d’ajouter une fonction de masque de colonne pour anonymiser les données sensibles. Les futures requêtes pour cette colonne retournent le résultat de la fonction évaluée au lieu de la valeur d’origine de la colonne. Cela est utile pour le contrôle d’accès affiné, car la fonction peut vérifier l’identité et les appartenances à un groupe de l’utilisateur afin de décider s’il faut masquer la valeur.

Voir Clause de masque de colonne.
table_constraint

Une contrainte de clé primaire ou de clé étrangère facultative et informationnelle sur la table.
TBLPROPERTIES

Liste facultative des propriétés de table disponibles pour la table.
WITH ROW FILTER clause (Préversion publique)

Permet d’ajouter une fonction de filtre de ligne à la table. Toutes les requêtes futures de cette table reçoivent un sous-ensemble de lignes pour lesquelles la fonction prend la valeur TRUE. Cela est utile pour le contrôle d’accès affiné, car la fonction peut inspecter l’identité et les appartenances à un groupe de l’utilisateur appelant afin de décider s’il convient de filtrer certaines lignes.

Consultez Clause ROW FILTER.
select_statement

Requête Delta Live Tables qui définit le jeu de données pour la table.
Clause CONSTRAINT
EXPECT expectation_name

Définit la contrainte de qualité des données expectation_name. Si la contrainte ON VIOLATION n’est pas définie, ajoutez les lignes qui ne la respectent pas dans le jeu de données cible.
ON VIOLATION

Action facultative à effectuer pour les lignes incriminées :

- FAIL UPDATE : arrêter immédiatement l’exécution du pipeline.
- DROP ROW : supprimer l’enregistrement et continuer le traitement.

Capture des changements de données avec SQL dans Delta Live Tables

Utilisez l’instruction APPLY CHANGES INTO pour utiliser la fonctionnalité de capture des changements de données Delta Live Tables, comme décrit dans les éléments suivants :

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Vous définissez des contraintes de qualité des données pour une cible APPLY CHANGES à l’aide de la même clause CONSTRAINT que les requêtes non-APPLY CHANGES. Consultez Gérer la qualité des données avec Delta Live Tables.

Remarque

Le comportement par défaut pour les événements INSERT et UPDATE consiste à effectuer un upsert des événements de capture des changements de données à partir de la source : mettre à jour les lignes de la table cible qui correspondent aux clés spécifiées, ou insérer une nouvelle ligne quand un enregistrement correspondant n’existe pas dans la table cible. La gestion des événements DELETE peut être spécifiée avec la condition APPLY AS DELETE WHEN.

Important

Vous devez déclarer une table de streaming cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Lorsque vous spécifiez le schéma de la table cible APPLY CHANGES, vous devez également inclure les colonnes __START_AT et __END_AT avec le même type de données que le champ sequence_by.

Consultez Les API APPLY CHANGES : Simplifiez la capture des changements de données avec Delta Live Tables.

Clauses
KEYS

Colonne ou combinaison de colonnes identifiant de façon unique une ligne dans les données sources. Utilisée pour identifier les événements de capture des changements de données qui s’appliquent à des enregistrements spécifiques dans la table cible.

Pour définir une combinaison de colonnes, utilisez une liste séparée par des virgules de colonnes.

La clause est obligatoire.
IGNORE NULL UPDATES

Autoriser l’ingestion des mises à jour contenant un sous-ensemble des colonnes cibles. Quand un événement de capture des changements de données correspond à une ligne existante et que la commande IGNORE NULL UPDATES est spécifiée, les colonnes contenant null conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec une valeur de null.

Cette clause est facultative.

La valeur par défaut consiste à remplacer les colonnes existantes par des valeurs null.
APPLY AS DELETE WHEN

Spécifie quand un événement de capture des changements de données doit être traité en tant qu’opération DELETE plutôt qu’opération upsert. Pour gérer des données non ordonnées, la ligne supprimée est conservée temporairement en tant qu’objet tombstone dans la table Delta sous-jacente, et un affichage est créé dans le metastore, qui filtre ces objets tombstone. Vous pouvez configurer l’intervalle de conservation avec la
Propriété de table pipelines.cdc.tombstoneGCThresholdInSeconds.

Cette clause est facultative.
APPLY AS TRUNCATE WHEN

Spécifie quand un événement de capture des changements de données doit être traité en tant que TRUNCATE de table complet. Étant donné que cette clause déclenche une troncation complète de la table cible, elle doit être utilisée uniquement pour des cas d’usage spécifiques nécessitant cette fonctionnalité.

La clause APPLY AS TRUNCATE WHEN est prise en charge uniquement pour le type SCD 1. Le SCD type 2 ne prend pas en charge l’opération de troncature.

Cette clause est facultative.
SEQUENCE BY

Nom de colonne spécifiant l’ordre logique des événements de capture des changements de données dans les données sources. Delta Live Tables utilise ce séquencement pour gérer les événements de modification qui se produisent dans le désordre.

La colonne spécifiée doit être un type de données triable.

La clause est obligatoire.
COLUMNS

Spécifie un sous-ensemble de colonnes à inclure dans la table cible. Vous pouvez :

- Spécifier la liste complète des colonnes à inclure : COLUMNS (userId, name, city).
- Spécifier une liste de colonnes à exclure : COLUMNS * EXCEPT (operation, sequenceNum)

Cette clause est facultative.

Par défaut, quand la clause COLUMNS n’est pas spécifiée, toutes les colonnes dans la table cible sont incluses.
STORED AS

Indique s’il faut stocker des enregistrements en tant que méthode SCD de type 1 ou méthode SCD de type 2.

Cette clause est facultative.

La valeur par défaut est la méthode SCD de type 1.
TRACK HISTORY ON

Spécifie un sous-ensemble de colonnes de sortie pour générer des enregistrements d’historique en cas de modification de ces colonnes spécifiées. Vous pouvez :

- Spécifier la liste complète des colonnes à suivre : COLUMNS (userId, name, city).
- Spécifiez une liste de colonnes à exclure du suivi : COLUMNS * EXCEPT (operation, sequenceNum)

Cette clause est facultative. Par défaut, l’historique de toutes les colonnes de sortie est suivi en cas de modification, équivalent à TRACK HISTORY ON *.