Usare tabelle delta con dati di streaming

Completato

Tutti i dati esaminati fino a questo punto sono dati statici nei file. Tuttavia, molti scenari di analisi dei dati includono dati in streaming che devono essere elaborati quasi in tempo reale. Ad esempio, potrebbe essere necessario acquisire le letture generate dai dispositivi IoT e archiviarle man mano in una tabella. Spark elabora i dati batch e i dati di streaming nello stesso modo, consentendo l'elaborazione dei dati di streaming in tempo reale con la stessa API.

Spark Structured Streaming

Una soluzione tipica di elaborazione dei flussi prevede la lettura continua di un flusso di dati da un'origine, facoltativamente l'elaborazione del flusso per selezionare campi specifici, aggregare e raggruppare o modificare i valori, oppure manipolare in altro modo i dati e scrivere i risultati in un sink.

Spark include il supporto nativo per i dati in streaming offrendo Spark Structured Streaming, un'API basata su un dataframe illimitato in cui i dati in streaming vengono acquisiti per l'elaborazione. Un dataframe Spark Structured Streaming può leggere i dati da molti tipi diversi di origine di streaming, tra cui:

  • Porte rete
  • Servizi di brokering di messaggi in tempo reale, ad esempio Hub eventi di Azure o Kafka
  • Percorsi del file system.

Suggerimento

Per altre informazioni su Spark Structured Streaming, vedere la guida alla programmazione di Spark Structured Streaming.

Streaming con tabelle Delta

È possibile usare una tabella Delta Lake come origine o sink per Spark Structured Streaming. Ad esempio, è possibile acquisire un flusso di dati in tempo reale da un dispositivo IoT e scrivere il flusso direttamente in una tabella Delta come sink. È quindi possibile eseguire una query sulla tabella per visualizzare i dati trasmessi più recenti. In alternativa, è possibile leggere una tabella Delta come origine di streaming, abilitando la creazione di report quasi in tempo reale man mano che vengono aggiunti nuovi dati alla tabella.

Uso di una tabella Delta come origine di streaming

Nell'esempio di PySpark seguente viene usata una tabella Delta per archiviare i dettagli degli ordini di vendita Internet.

%%sql
CREATE TABLE orders_in
(
        OrderID INT,
        OrderDate DATE,
        Customer STRING,
        Product STRING,
        Quantity INT,
        Price DECIMAL
)
USING DELTA;

Un ipotetico flusso di dati degli ordini Internet viene inserito nella tabella orders_in:

%%sql
INSERT INTO orders_in (OrderID, OrderDate, Customer, Product, Quantity, Price)
VALUES
    (3001, '2024-09-01', 'Yang', 'Road Bike Red', 1, 1200),
    (3002, '2024-09-01', 'Carlson', 'Mountain Bike Silver', 1, 1500),
    (3003, '2024-09-02', 'Wilson', 'Road Bike Yellow', 2, 1350),
    (3004, '2024-09-02', 'Yang', 'Road Front Wheel', 1, 115),
    (3005, '2024-09-02', 'Rai', 'Mountain Bike Black', 1, NULL);

Per verificare, è possibile leggere e visualizzare i dati dalla tabella di input:

# Read and display the input table
df = spark.read.format("delta").table("orders_in")

display(df)

I dati vengono quindi caricati in un dataframe di streaming dalla tabella Delta:

# Load a streaming DataFrame from the Delta table
stream_df = spark.readStream.format("delta") \
    .option("ignoreChanges", "true") \
    .table("orders_in")

Nota

Quando si usa una tabella Delta come origine di streaming, nel flusso possono essere incluse solo operazioni di aggiunta. Le modifiche ai dati genereranno un errore a meno che non si specifichi l'opzione ignoreChanges o ignoreDeletes .

È possibile verificare che il flusso sia in streaming usando la proprietà isStreaming che deve restituire True:

# Verify that the stream is streaming
stream_df.isStreaming

Trasformare il flusso di dati

Dopo aver letto i dati dalla tabella Delta in un dataframe di streaming, è possibile usare l'API Spark Structured Streaming per elaborarli. Ad esempio, è possibile contare il numero di ordini effettuati ogni minuto e inviare i risultati aggregati a un processo downstream per la visualizzazione quasi in tempo reale.

In questo esempio tutte le righe con NULL nella colonna Price vengono filtrate e vengono aggiunte nuove colonne per IsBike e Total.

from pyspark.sql.functions import col, expr

transformed_df = stream_df.filter(col("Price").isNotNull()) \
    .withColumn('IsBike', expr("INSTR(Product, 'Bike') > 0").cast('int')) \
    .withColumn('Total', expr("Quantity * Price").cast('decimal'))

Uso di una tabella Delta come sink di streaming

Il flusso di dati viene quindi scritto in una tabella Delta:

# Write the stream to a delta table
output_table_path = 'Tables/orders_processed'
checkpointpath = 'Files/delta/checkpoint'
deltastream = transformed_df.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(output_table_path)

print("Streaming to orders_processed...")

Nota

L'opzione checkpointLocation viene usata per scrivere un file di checkpoint che tiene traccia dello stato di elaborazione del flusso. Questo file consente di eseguire il ripristino da un errore nel punto in cui l'elaborazione del flusso è stata interrotta.

Dopo l'avvio del processo di streaming, è possibile eseguire una query sulla tabella Delta Lake per vedere cosa si trova nella tabella di output. Potrebbe verificarsi un breve ritardo prima di poter eseguire una query sulla tabella.

%%sql
SELECT *
    FROM orders_processed
    ORDER BY OrderID;

Nei risultati di questa query l'ordine 3005 viene escluso perché contiene NULL nella colonna Price. E vengono visualizzate le due colonne aggiunte durante la trasformazione: IsBike e Total.

OrderID DataOrdine Customer Prodotto Quantità Prezzo IsBike Totale
3001 01-09-2023 Yang Road Bike Red 1 1200 1 1200
3002 01-09-2023 Carlson Mountain Bike Silver 1 1500 1 1500
3003 2023-09-02 Wilson Road Bike Yellow 2 1350 1 2700
3004 2023-09-02 Yang Road Front Wheel 1 115 0 115

Al termine, arrestare i dati di streaming per evitare costi di elaborazione non necessari usando il metodo stop:

# Stop the streaming data to avoid excessive processing costs
delta_stream.stop()

Suggerimento

Per altre informazioni sull'uso delle tabelle Delta per i dati in streaming, vedere Table streaming reads and writes nella documentazione di Delta Lake.