Partager via


Développer du code de pipeline avec SQL

Delta Live Tables introduit plusieurs nouveaux mots clés et fonctions SQL pour définir des vues matérialisées et des tables de diffusion en continu dans des pipelines. La prise en charge de SQL pour le développement de pipelines s’appuie sur les principes de base de Spark SQL et ajoute la prise en charge des fonctionnalités Structured Streaming.

Les utilisateurs familiarisés avec les DataFrames PySpark peuvent préférer développer du code de pipeline avec Python. Python prend en charge des tests et des opérations plus étendus qui sont difficiles à implémenter avec SQL, comme les opérations de métagrammation. Consultez Développer du code de pipeline avec Python.

Pour obtenir une référence complète de la syntaxe SQL Delta Live Tables, consultez la référence du langage SQL Delta Live Tables.

Notions de base de SQL pour le développement de pipelines

Le code SQL qui crée des jeux de données Delta Live Tables utilise la CREATE OR REFRESH syntaxe pour définir des vues matérialisées et des tables de diffusion en continu par rapport aux résultats de la requête.

Le STREAM mot clé indique si la source de données référencée dans une SELECT clause doit être lue avec la sémantique de diffusion en continu.

Le code source Delta Live Tables diffère critiquement des scripts SQL : Delta Live Tables évalue toutes les définitions de jeu de données sur tous les fichiers de code source configurés dans un pipeline et génère un graphique de flux de données avant l’exécution des requêtes. L’ordre des requêtes apparaissant dans un notebook ou un script ne définit pas l’ordre d’exécution.

Créer une vue matérialisée avec SQL

L’exemple de code suivant illustre la syntaxe de base pour la création d’une vue matérialisée avec SQL :

CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;

Créer une table de diffusion en continu avec SQL

L’exemple de code suivant illustre la syntaxe de base pour la création d’une table de streaming avec SQL :

Remarque

Toutes les sources de données ne prennent pas en charge les lectures de diffusion en continu, et certaines sources de données doivent toujours être traitées avec la sémantique de diffusion en continu.

CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;

Charger des données à partir du stockage d’objets

Delta Live Tables prend en charge le chargement de données à partir de tous les formats pris en charge par Azure Databricks. Consultez Options de format de données.

Remarque

Ces exemples utilisent des données disponibles sous le /databricks-datasets montage automatique de votre espace de travail. Databricks recommande d’utiliser des chemins de volume ou des URI cloud pour référencer les données stockées dans le stockage d’objets cloud. Consultez Présentation des volumes Unity Catalog.

Databricks recommande d’utiliser le chargeur automatique et les tables de diffusion en continu lors de la configuration de charges de travail d’ingestion incrémentielles sur les données stockées dans le stockage d’objets cloud. Consultez Qu’est-ce que Auto Loader ?.

SQL utilise la fonction pour appeler la read_files fonctionnalité chargeur automatique. Vous devez également utiliser le STREAM mot clé pour configurer une lecture en continu avec read_files.

L’exemple suivant crée une table de streaming à partir de fichiers JSON à l’aide du chargeur automatique :

CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

La read_files fonction prend également en charge la sémantique des lots pour créer des vues matérialisées. L’exemple suivant utilise la sémantique de traitement par lots pour lire un répertoire JSON et créer une vue matérialisée :

CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");

Valider les données avec des attentes

Vous pouvez utiliser des attentes pour définir et appliquer des contraintes de qualité des données. Consultez Gérer la qualité des données avec Delta Live Tables.

Le code suivant définit une attente nommée valid_data qui supprime les enregistrements null lors de l’ingestion des données :

CREATE OR REFRESH STREAMING TABLE orders_valid(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

Interroger des vues matérialisées et des tables de diffusion en continu définies dans votre pipeline

Utilisez le LIVE schéma pour interroger d’autres vues matérialisées et tables de diffusion en continu définies dans votre pipeline.

L’exemple suivant définit quatre jeux de données :

  • Table de diffusion en continu nommée orders qui charge des données JSON.
  • Vue matérialisée nommée customers qui charge les données CSV.
  • Vue matérialisée nommée customer_orders qui joint des enregistrements à partir des jeux de données et customers des orders jeux de données, convertit l’horodatage de l’ordre en date, puis sélectionne les champs , order_numberet stateorder_date les customer_idchamps.
  • Vue matérialisée nommée daily_orders_by_state qui agrège le nombre quotidien de commandes pour chaque état.
CREATE OR REFRESH STREAMING TABLE orders(
  CONSTRAINT valid_date
  EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
  ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");

CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");

CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
  c.customer_id,
  o.order_number,
  c.state,
  date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;

CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;