Utiliser des tables delta avec des données de streaming

Effectué

Toutes les données que nous avons explorées à ce stade étaient des données statiques dans des fichiers. Toutefois, de nombreux scénarios d’analytique des données impliquent des données de diffusion en continu qui doivent être traitées en temps quasi réel. Par exemple, vous devrez peut-être capturer les lectures émises par les appareils IoT (Internet des objets) et les stocker dans une table à mesure qu’elles se produisent. Spark traite les données par lots et les données de diffusion en continu de la même façon, ce qui permet de traiter les données de diffusion en continu en temps réel à l’aide de la même API.

Spark Structured Streaming

Une solution de traitement de flux classique implique la lecture constante d’un flux de données à partir d’une source, le traitement facultatif pour sélectionner des champs spécifiques, l’agrégation et le regroupement de valeurs, ou la manipulation des données, et l’écriture des résultats dans un récepteur.

Spark inclut la prise en charge native des données de diffusion en continu via Spark Structured Streaming, une API basée sur un dataframe sans limite dans lequel les données de diffusion en continu sont capturées pour traitement. Un DataFrame Spark Structured Streaming peut lire des données à partir de nombreux types de sources de diffusion en continu, notamment :

  • Ports réseau
  • Services de répartiteur de messages en temps réel tels qu’Azure Event Hubs ou Kafka
  • Emplacements du système de fichiers.

Conseil

Pour plus d’informations sur Spark Structured Streaming, consultez le Guide de programmation de streaming structuré de la documentation Spark.

Diffusion en continu avec des tables Delta

Vous pouvez utiliser une table delta comme source ou récepteur pour Spark Structured Streaming. Par exemple, vous pouvez capturer un flux de données en temps réel à partir d’un appareil IoT et écrire le flux directement dans une table Delta en tant que récepteur. Vous pouvez ensuite interroger la table pour afficher les données diffusées en continu les plus récentes. Vous pouvez également lire une table Delta en tant que source de diffusion en continu, ce qui permet de créer des rapports en temps quasi réel lorsque de nouvelles données sont ajoutées à la table.

Utilisation d’une table delta comme source de diffusion en continu

Dans l’exemple PySpark suivant, une table delta est créée pour stocker les détails des commandes commerciales sur Internet :

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Un flux de données hypothétique de commandes Internet est inséré dans la table orders_in :

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Pour vérifier, vous pouvez lire et afficher des données à partir de la table d’entrée :

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

Les données sont ensuite chargées dans un DataFrame de diffusion en continu à partir de la table Delta :

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Remarque

Quand vous utilisez une table delta comme source de diffusion en continu, seules des opérations d’ajout peuvent être incluses dans le flux. Les modifications de données provoquent une erreur, sauf si vous spécifiez l’option ignoreChanges ou ignoreDeletes.

Vous pouvez vérifier que le flux est en diffusion en continu à l’aide de la propriété isStreaming qui doit retourner True :

# Verify that the stream is streaming
stream_df.isStreaming

Transformer le flux de données

Après avoir lu les données de la table Delta dans un DataFrame de diffusion en continu, vous pouvez utiliser l’API Spark Structured Streaming pour les traiter. Par exemple, vous pouvez compter le nombre de commandes passées toutes les minutes et envoyer les résultats agrégés à un processus en aval pour une visualisation en temps quasi réel.

Dans cet exemple, toutes les lignes avec NULL dans la colonne Price sont filtrées et de nouvelles colonnes sont ajoutées pour IsBike et Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Utilisation d’une table Delta comme récepteur de diffusion en continu

Le flux de données est ensuite écrit dans une table Delta :

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Remarque

L’option checkpointLocation est utilisée pour écrire un fichier de point de contrôle qui suit l’état du traitement de flux. Ce fichier vous permet de récupérer à partir d’une défaillance au point où le traitement de flux a été arrêté.

Une fois le processus de diffusion en continu démarré, vous pouvez interroger la table Delta Lake pour voir ce qui se trouve dans la table de sortie. Il peut y avoir un court délai avant de pouvoir interroger la table.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

Dans les résultats de cette requête, la commande 3005 est exclue, car elle a la valeur NULL dans la colonne Price. Et les deux colonnes qui ont été ajoutées pendant la transformation sont affichées : IsBike et Total.

OrderID OrderDate Client Produit Quantity Prix IsBike Total
3001 2023-09-01 Yang Road Bike Red 1 1200 1 1 200
3002 2023-09-01 Carlson Mountain Bike Silver 1 1 500 1 1 500
3003 2023-09-02 Wilson Road Bike Yellow 2 1 350 1 2700
3004 2023-09-02 Yang Road Front Wheel 1 115 0 115

Lorsque vous avez terminé, arrêtez les données de diffusion en continu pour éviter les coûts de traitement inutiles à l’aide de la méthode stop :

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

Conseil

Pour plus d’informations sur l’utilisation des tables Delta pour la diffusion en continu des données, consultez Lectures et écritures de diffusion en continu de tables dans la documentation Delta Lake.