Eseguire il primo carico di lavoro Structured Streaming
Questo articolo fornisce esempi di codice e spiegazione dei concetti di base necessari per eseguire le prime query Structured Streaming in Azure Databricks. È possibile usare Structured Streaming per carichi di lavoro di elaborazione quasi in tempo reale e incrementali.
Structured Streaming è una delle molte tecnologie che alimentano lo streaming tables in Delta Live Tables. Databricks consiglia di usare Delta Live Tables per tutti i nuovi carichi di lavoro ETL, acquisizione dati e Structured Streaming. Vedi Che cos'è Delta Live Tables?.
Nota
Mentre Delta Live Tables fornisce una sintassi leggermente modificata per dichiarare lo streaming tables, la sintassi generale per la configurazione delle letture e delle trasformazioni di streaming si applica a tutti i casi d'uso di streaming in Azure Databricks. Delta Live Tables semplifica anche lo streaming gestendo informazioni sullo stato, metadati e numerose configurazioni.
Usare il caricatore automatico per leggere i dati di streaming dall'archiviazione oggetti
L'esempio seguente illustra il caricamento di dati JSON con il caricatore automatico, che usa cloudFiles
per indicare il formato e le opzioni. L'opzione schemaLocation
abilita inferenza ed evoluzione schema. Incollare il codice seguente in una cella del notebook di Databricks ed eseguire la cella per creare un DataFrame di streaming denominato raw_df
:
file_path = "/databricks-datasets/structured-streaming/events"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
raw_df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(file_path)
)
Analogamente ad altre operazioni di lettura in Azure Databricks, la configurazione di una lettura in streaming non carica effettivamente i dati. È necessario attivare un'azione sui dati prima dell'inizio del flusso.
Nota
La chiamata display()
a un DataFrame di streaming avvia un processo di streaming. Per la maggior parte dei casi d'uso di Structured Streaming, l'azione che attiva un flusso dovrebbe essere scrivere dati in un sink. Vedere Considerazioni sulla produzione per Structured Streaming.
Eseguire una trasformazione di streaming
Structured Streaming supporta la maggior parte delle trasformazioni disponibili in Azure Databricks e Spark SQL. È anche possibile caricare i modelli MLflow come funzioni definite dall'utente ed eseguire stime di streaming come trasformazione.
L'esempio di codice seguente completa una semplice trasformazione per arricchire i dati JSON inseriti con informazioni aggiuntive usando le funzioni Spark SQL:
from pyspark.sql.functions import col, current_timestamp
transformed_df = (raw_df.select(
"*",
col("_metadata.file_path").alias("source_file"),
current_timestamp().alias("processing_time")
)
)
L'oggetto risultante transformed_df
contiene istruzioni di query per caricare e trasformare ogni record man mano che arriva nell'origine dati.
Nota
Structured Streaming considera le origini dati come set di dati non associati o infiniti. Di conseguenza, alcune trasformazioni non sono supportate nei carichi di lavoro Structured Streaming perché richiedono l'ordinamento di un numero infinito di elementi.
La maggior parte delle aggregazioni e molti join richiedono la gestione delle informazioni sullo stato con limiti, finestre e modalità di output. Vedere Applicare limiti per controllare le soglie di elaborazione dati.
Eseguire una scrittura batch incrementale in Delta Lake
L'esempio seguente scrive in Delta Lake usando un percorso e un checkpoint di file specificati.
Importante
Assicurarsi sempre di specificare una posizione di checkpoint univoca per ogni writer di streaming configurato. Il checkpoint fornisce l'identità univoca per il flusso, monitorando tutti i record elaborati e le informazioni sullo stato associati alla query di streaming.
L'impostazione availableNow
per il trigger indica a Structured Streaming di elaborare tutti i record non elaborati in precedenza dal set di dati di origine e quindi arrestarli, in modo da poter eseguire in modo sicuro il codice seguente senza doversi preoccupare di lasciare in esecuzione un flusso:
target_path = "/tmp/ss-tutorial/"
checkpoint_path = "/tmp/ss-tutorial/_checkpoint"
transformed_df.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.start()
In questo esempio non arrivano nuovi record nell'origine dati, quindi l'esecuzione ripetuta di questo codice non inserisce nuovi record.
Avviso
L'esecuzione di Structured Streaming può impedire la chiusura automatica delle risorse di calcolo. Per evitare costi imprevisti, assicurarsi di terminare le query di streaming.
Leggere i dati da Delta Lake, trasformare e scrivere in Delta Lake
Delta Lake offre un ampio supporto per l'uso di Structured Streaming sia come origine che come sink. Consultare letture e scritture in streaming table.
Nell'esempio seguente viene illustrata la sintassi di esempio per caricare in modo incrementale tutti i nuovi record da un tableDelta, join con uno snapshot di un altro tableDelta e scriverli in un tableDelta:
(spark.readStream
.table("<table-name1>")
.join(spark.read.table("<table-name2>"), on="<id>", how="left")
.writeStream
.trigger(availableNow=True)
.option("checkpointLocation", "<checkpoint-path>")
.toTable("<table-name3>")
)
È necessario disporre delle autorizzazioni appropriate configurate per leggere le tables di origine e scrivere nell'tables di destinazione e nel percorso del checkpoint specificato. Compilare tutte le parameters indicate con parentesi angolari (<>
) usando le values pertinenti per le fonti dati e di destinazione.
Nota
Delta Live Tables fornisce una sintassi completamente dichiarativa per la creazione di pipeline Delta Lake e gestisce automaticamente proprietà quali i trigger e i checkpoint. Vedi Che cos'è Delta Live Tables?.
Leggere i dati da Kafka, trasformare e scrivere in Kafka
Apache Kafka e altri bus di messaggistica offrono una certa latenza più bassa disponibile per set di dati di grandi dimensioni. È possibile usare Azure Databricks per applicare trasformazioni ai dati inseriti da Kafka e quindi scrivere nuovamente i dati in Kafka.
Nota
La scrittura di dati nell'archiviazione oggetti cloud comporta un sovraccarico di latenza aggiuntivo. Se si desidera archiviare dati da un bus di messaggistica in Delta Lake, ma è necessaria la latenza più bassa possibile per i carichi di lavoro di streaming, Databricks consiglia di configurare processi di streaming separati per inserire dati nella lakehouse e applicare trasformazioni quasi in tempo reale per i sink del bus di messaggistica downstream.
L'esempio di codice seguente illustra un modello semplice per arricchire i dati di Kafka unendoli ai dati in Delta table e quindi registrandoli nuovamente su Kafka.
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "<topic>")
.option("startingOffsets", "latest")
.load()
.join(spark.read.table("<table-name>"), on="<id>", how="left")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("topic", "<topic>")
.option("checkpointLocation", "<checkpoint-path>")
.start()
)
È necessario disporre delle autorizzazioni appropriate configurate per l'accesso al servizio Kafka. Compilare tutte le parameters indicate con parentesi angolari (<>
) usando le values pertinenti per le fonti dati e di destinazione. Vedere Elaborazione del flusso con Apache Kafka e Azure Databricks.