Développer du code de pipeline avec SQL
Delta Live Tables introduit plusieurs nouveaux mots clés et fonctions SQL pour définir des vues matérialisées et des tables de diffusion en continu dans des pipelines. La prise en charge de SQL pour le développement de pipelines s’appuie sur les principes de base de Spark SQL et ajoute la prise en charge des fonctionnalités Structured Streaming.
Les utilisateurs familiarisés avec les DataFrames PySpark peuvent préférer développer du code de pipeline avec Python. Python prend en charge des tests et des opérations plus étendus qui sont difficiles à implémenter avec SQL, comme les opérations de métagrammation. Consultez Développer du code de pipeline avec Python.
Pour obtenir une référence complète de la syntaxe SQL Delta Live Tables, consultez la référence du langage SQL Delta Live Tables.
Notions de base de SQL pour le développement de pipelines
Le code SQL qui crée des jeux de données Delta Live Tables utilise la CREATE OR REFRESH
syntaxe pour définir des vues matérialisées et des tables de diffusion en continu par rapport aux résultats de la requête.
Le STREAM
mot clé indique si la source de données référencée dans une SELECT
clause doit être lue avec la sémantique de diffusion en continu.
Le code source Delta Live Tables diffère critiquement des scripts SQL : Delta Live Tables évalue toutes les définitions de jeu de données sur tous les fichiers de code source configurés dans un pipeline et génère un graphique de flux de données avant l’exécution des requêtes. L’ordre des requêtes apparaissant dans un notebook ou un script ne définit pas l’ordre d’exécution.
Créer une vue matérialisée avec SQL
L’exemple de code suivant illustre la syntaxe de base pour la création d’une vue matérialisée avec SQL :
CREATE OR REFRESH MATERIALIZED VIEW basic_mv
AS SELECT * FROM samples.nyctaxi.trips;
Créer une table de diffusion en continu avec SQL
L’exemple de code suivant illustre la syntaxe de base pour la création d’une table de streaming avec SQL :
Remarque
Toutes les sources de données ne prennent pas en charge les lectures de diffusion en continu, et certaines sources de données doivent toujours être traitées avec la sémantique de diffusion en continu.
CREATE OR REFRESH STREAMING TABLE basic_st
AS SELECT * FROM STREAM samples.nyctaxi.trips;
Charger des données à partir du stockage d’objets
Delta Live Tables prend en charge le chargement de données à partir de tous les formats pris en charge par Azure Databricks. Consultez Options de format de données.
Remarque
Ces exemples utilisent des données disponibles sous le /databricks-datasets
montage automatique de votre espace de travail. Databricks recommande d’utiliser des chemins de volume ou des URI cloud pour référencer les données stockées dans le stockage d’objets cloud. Consultez Présentation des volumes Unity Catalog.
Databricks recommande d’utiliser le chargeur automatique et les tables de diffusion en continu lors de la configuration de charges de travail d’ingestion incrémentielles sur les données stockées dans le stockage d’objets cloud. Consultez Qu’est-ce que Auto Loader ?.
SQL utilise la fonction pour appeler la read_files
fonctionnalité chargeur automatique. Vous devez également utiliser le STREAM
mot clé pour configurer une lecture en continu avec read_files
.
L’exemple suivant crée une table de streaming à partir de fichiers JSON à l’aide du chargeur automatique :
CREATE OR REFRESH STREAMING TABLE ingestion_st
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
La read_files
fonction prend également en charge la sémantique des lots pour créer des vues matérialisées. L’exemple suivant utilise la sémantique de traitement par lots pour lire un répertoire JSON et créer une vue matérialisée :
CREATE OR REFRESH MATERIALIZED VIEW batch_mv
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders");
Valider les données avec des attentes
Vous pouvez utiliser des attentes pour définir et appliquer des contraintes de qualité des données. Consultez Gérer la qualité des données avec Delta Live Tables.
Le code suivant définit une attente nommée valid_data
qui supprime les enregistrements null lors de l’ingestion des données :
CREATE OR REFRESH STREAMING TABLE orders_valid(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
Interroger des vues matérialisées et des tables de diffusion en continu définies dans votre pipeline
Utilisez le LIVE
schéma pour interroger d’autres vues matérialisées et tables de diffusion en continu définies dans votre pipeline.
L’exemple suivant définit quatre jeux de données :
- Table de diffusion en continu nommée
orders
qui charge des données JSON. - Vue matérialisée nommée
customers
qui charge les données CSV. - Vue matérialisée nommée
customer_orders
qui joint des enregistrements à partir des jeux de données etcustomers
desorders
jeux de données, convertit l’horodatage de l’ordre en date, puis sélectionne les champs ,order_number
etstate
order_date
lescustomer_id
champs. - Vue matérialisée nommée
daily_orders_by_state
qui agrège le nombre quotidien de commandes pour chaque état.
CREATE OR REFRESH STREAMING TABLE orders(
CONSTRAINT valid_date
EXPECT (order_datetime IS NOT NULL AND length(order_datetime) > 0)
ON VIOLATION DROP ROW
)
AS SELECT * FROM STREAM read_files("/databricks-datasets/retail-org/sales_orders");
CREATE OR REFRESH MATERIALIZED VIEW customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers");
CREATE OR REFRESH MATERIALIZED VIEW customer_orders
AS SELECT
c.customer_id,
o.order_number,
c.state,
date(timestamp(int(o.order_datetime))) order_date
FROM LIVE.orders o
INNER JOIN LIVE.customers c
ON o.customer_id = c.customer_id;
CREATE OR REFRESH MATERIALIZED VIEW daily_orders_by_state
AS SELECT state, order_date, count(*) order_count
FROM LIVE.customer_orders
GROUP BY state, order_date;