Condividi tramite


Eseguire il primo carico di lavoro Structured Streaming

Questo articolo fornisce esempi di codice e spiegazione dei concetti di base necessari per eseguire le prime query Structured Streaming in Azure Databricks. È possibile usare Structured Streaming per carichi di lavoro di elaborazione quasi in tempo reale e incrementali.

Structured Streaming è una delle diverse tecnologie che alimentano le tabelle di streaming nelle tabelle Live Delta. Databricks consiglia di usare tabelle Live Delta per tutti i nuovi carichi di lavoro ETL, di inserimento e Structured Streaming. Vedere Che cos'è Delta Live Tables?.

Nota

Sebbene le tabelle live Delta forniscano una sintassi leggermente modificata per la dichiarazione di tabelle di streaming, la sintassi generale per la configurazione delle letture e delle trasformazioni di streaming si applica a tutti i casi d'uso di streaming in Azure Databricks. Le tabelle live delta semplificano anche lo streaming gestendo informazioni sullo stato, metadati e numerose configurazioni.

Usare il caricatore automatico per leggere i dati di streaming dall'archiviazione oggetti

L'esempio seguente illustra il caricamento di dati JSON con il caricatore automatico, che usa cloudFiles per indicare il formato e le opzioni. L'opzione schemaLocation abilita l'inferenza dello schema e l'evoluzione. Incollare il codice seguente in una cella del notebook di Databricks ed eseguire la cella per creare un DataFrame di streaming denominato raw_df:

file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

raw_df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", checkpoint_path)
    .load(file_path)
)

Analogamente ad altre operazioni di lettura in Azure Databricks, la configurazione di una lettura in streaming non carica effettivamente i dati. È necessario attivare un'azione sui dati prima dell'inizio del flusso.

Nota

La chiamata display() a un DataFrame di streaming avvia un processo di streaming. Per la maggior parte dei casi d'uso di Structured Streaming, l'azione che attiva un flusso dovrebbe essere scrivere dati in un sink. Vedere Considerazioni sulla produzione per Structured Streaming.

Eseguire una trasformazione di streaming

Structured Streaming supporta la maggior parte delle trasformazioni disponibili in Azure Databricks e Spark SQL. È anche possibile caricare i modelli MLflow come funzioni definite dall'utente ed eseguire stime di streaming come trasformazione.

L'esempio di codice seguente completa una semplice trasformazione per arricchire i dati JSON inseriti con informazioni aggiuntive usando le funzioni Spark SQL:

from pyspark.sql.functions import col, current_timestamp

transformed_df = (raw_df.select(
    "*",
    col("_metadata.file_path").alias("source_file"),
    current_timestamp().alias("processing_time")
    )
)

L'oggetto risultante transformed_df contiene istruzioni di query per caricare e trasformare ogni record man mano che arriva nell'origine dati.

Nota

Structured Streaming considera le origini dati come set di dati non associati o infiniti. Di conseguenza, alcune trasformazioni non sono supportate nei carichi di lavoro Structured Streaming perché richiedono l'ordinamento di un numero infinito di elementi.

La maggior parte delle aggregazioni e molti join richiedono la gestione delle informazioni sullo stato con limiti, finestre e modalità di output. Vedere Applicare limiti per controllare le soglie di elaborazione dati.

Eseguire una scrittura batch incrementale in Delta Lake

L'esempio seguente scrive in Delta Lake usando un percorso e un checkpoint di file specificati.

Importante

Assicurarsi sempre di specificare una posizione di checkpoint univoca per ogni writer di streaming configurato. Il checkpoint fornisce l'identità univoca per il flusso, monitorando tutti i record elaborati e le informazioni sullo stato associati alla query di streaming.

L'impostazione availableNow per il trigger indica a Structured Streaming di elaborare tutti i record non elaborati in precedenza dal set di dati di origine e quindi arrestarli, in modo da poter eseguire in modo sicuro il codice seguente senza doversi preoccupare di lasciare in esecuzione un flusso:

target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"

transformed_df.writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .option("path", target_path)
    .start()

In questo esempio non arrivano nuovi record nell'origine dati, quindi l'esecuzione ripetuta di questo codice non inserisce nuovi record.

Avviso

L'esecuzione di Structured Streaming può impedire la chiusura automatica delle risorse di calcolo. Per evitare costi imprevisti, assicurarsi di terminare le query di streaming.

Leggere i dati da Delta Lake, trasformare e scrivere in Delta Lake

Delta Lake offre un ampio supporto per l'uso di Structured Streaming sia come origine che come sink. Vedere Letture e scritture di streaming di tabelle Delta.

Nell'esempio seguente viene illustrata la sintassi di esempio per caricare in modo incrementale tutti i nuovi record da una tabella Delta, unirli con uno snapshot di un'altra tabella Delta e scriverli in una tabella Delta:

(spark.readStream
    .table("<table-name1>")
    .join(spark.read.table("<table-name2>"), on="<id>", how="left")
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", "<checkpoint-path>")
    .toTable("<table-name3>")
)

È necessario disporre delle autorizzazioni appropriate configurate per leggere le tabelle di origine e scrivere nelle tabelle di destinazione e nella posizione del checkpoint specificato. Compila tutti i parametri indicati con parentesi acute (<>) usando i valori pertinenti per le origini dati e i sink.

Nota

Delta Live Tables fornisce una sintassi completamente dichiarativa per la creazione di pipeline Delta Lake e gestisce automaticamente proprietà come trigger e checkpoint. Vedere Che cos'è Delta Live Tables?.

Leggere i dati da Kafka, trasformare e scrivere in Kafka

Apache Kafka e altri bus di messaggistica offrono una certa latenza più bassa disponibile per set di dati di grandi dimensioni. È possibile usare Azure Databricks per applicare trasformazioni ai dati inseriti da Kafka e quindi scrivere nuovamente i dati in Kafka.

Nota

La scrittura di dati nell'archiviazione oggetti cloud comporta un sovraccarico di latenza aggiuntivo. Se si desidera archiviare dati da un bus di messaggistica in Delta Lake, ma è necessaria la latenza più bassa possibile per i carichi di lavoro di streaming, Databricks consiglia di configurare processi di streaming separati per inserire dati nella lakehouse e applicare trasformazioni quasi in tempo reale per i sink del bus di messaggistica downstream.

L'esempio di codice seguente illustra un modello semplice per arricchire i dati di Kafka unendo i dati in una tabella Delta e quindi scrivendo di nuovo in Kafka:

(spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("subscribe", "<topic>")
    .option("startingOffsets", "latest")
    .load()
    .join(spark.read.table("<table-name>"), on="<id>", how="left")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "<server:ip>")
    .option("topic", "<topic>")
    .option("checkpointLocation", "<checkpoint-path>")
    .start()
)

È necessario disporre delle autorizzazioni appropriate configurate per l'accesso al servizio Kafka. Compila tutti i parametri indicati con parentesi acute (<>) usando i valori pertinenti per le origini dati e i sink. Vedere Elaborazione del flusso con Apache Kafka e Azure Databricks.