Crea una pipeline di analisi end-to-end in Databricks
Questo articolo illustra come creare e distribuire una pipeline di elaborazione dati end-to-end, tra cui come inserire dati non elaborati, trasformare i dati ed eseguire analisi sui dati elaborati.
Nota
Anche se questo articolo illustra come creare una pipeline di dati completa usando Databricks notebook e un processo di Azure Databricks per orchestrare un flusso di lavoro, Databricks consiglia di usare DLT, un'interfaccia dichiarativa per la creazione di pipeline di elaborazione dati affidabili, gestibili e testabili.
Che cos'è una pipeline di dati?
Una pipeline di dati implementa i passaggi necessari per spostare i dati dai sistemi di origine, trasformare i dati in base ai requisiti e archiviare i dati in un sistema di destinazione. Una pipeline di dati include tutti i processi necessari per trasformare i dati non elaborati in dati preparati che gli utenti possono utilizzare. Ad esempio, una pipeline di dati potrebbe preparare i dati in modo che gli analisti dei dati e i data scientist possano estrarre valore da essi tramite analisi e creazione di report.
Un flusso di lavoro di estrazione, trasformazione e caricamento (ETL) è un esempio comune di una pipeline di dati. Nell'elaborazione ETL i dati vengono inseriti dai sistemi di origine e scritti in un'area di gestione temporanea, trasformati in base ai requisiti (garantire la qualità dei dati, deduplicare i record e così via) e quindi scritti in un sistema di destinazione, ad esempio un data warehouse o un data lake.
Passaggi del flusso di dati
Per iniziare a creare pipeline di dati in Azure Databricks, l'esempio incluso in questo articolo illustra come creare un flusso di lavoro di elaborazione dati:
- Usare le funzionalità di Azure Databricks per esplorare un set di dati non elaborato.
- Creare un notebook di Databricks per inserire dati di origine non elaborati e scrivere i dati non elaborati in una tabella di destinazione.
- Creare un notebook di Databricks per trasformare i dati di origine non elaborati e scrivere i dati trasformati in una tabella di destinazione.
- Creare un notebook di Databricks per eseguire una query sui dati trasformati.
- Automatizzare la pipeline di dati con un job di Azure Databricks.
Requisiti
- Si è connessi ad Azure Databricks e nell'area di lavoro Data Science & Engineering.
- Hai l'autorizzazione per creare una risorsa di calcolo o l'accesso a una risorsa di calcolo.
- (Facoltativo) Per pubblicare tabelle in Unity Catalog, è necessario creare un catalogo e uno schema in Unity Catalog.
Esempio: Set di dati Million Song
Il set di dati usato in questo esempio è un subset di Million Song Dataset, una raccolta di caratteristiche e metadati per le tracce musicali contemporanee. Questo set di dati è disponibile nei set di dati di esempio inclusi nell'area di lavoro di Azure Databricks.
Passaggio 1: Creare una risorsa di calcolo
Per eseguire l'elaborazione e l'analisi dei dati in questo esempio, creare una risorsa di calcolo per eseguire i comandi.
Nota
Poiché questo esempio usa un set di dati di esempio archiviato in DBFS e consiglia di rendere persistenti le tabelle per Catalogo Unity, si crea una risorsa di calcolo configurata con modalità di accesso dedicato. La modalità di accesso dedicato consente l'accesso completo a DBFS, consentendo anche l'accesso a Unity Catalog. Vedere Procedure consigliate per DBFS e il catalogo Unity.
- Nella barra laterale fare clic su Ambiente di calcolo.
- Nella pagina Calcolo, fare clic su Crea calcolo.
- Nella nuova pagina di calcolo immettere un nome univoco per la risorsa di calcolo.
- In Advancedimpostare la modalità di accesso su Manuale, quindi selezionare Dedicato.
- In singolo utente o gruppo, seleziona il tuo nome utente.
- Lasciare i valori rimanenti nello stato predefinito e fare clic su Crea.
Per altre informazioni sulle risorse di calcolo di Databricks, vedere Compute.
Passaggio 2: Esplorare i dati di origine
Per informazioni su come usare l'interfaccia di Azure Databricks per esplorare i dati di origine non elaborati, vedere Esplorare i dati di origine per una pipeline di dati. Se si vuole passare direttamente all'inserimento e alla preparazione dei dati, continuare con il Passaggio 3: Inserire i dati non elaborati.
Passaggio 3: Inserire i dati non elaborati
In questo passaggio i dati non elaborati vengono caricati in una tabella per renderli disponibili per un'ulteriore elaborazione. Per gestire gli asset di dati nella piattaforma Databricks, ad esempio le tabelle, Databricks consiglia Unity Catalog. Tuttavia, se non si dispone delle autorizzazioni per creare il catalogo e lo schema necessari per pubblicare le tabelle nel catalogo Unity, è comunque possibile completare i passaggi seguenti pubblicando le tabelle nel metastore Hive.
Databricks consiglia di usare il caricatore automatico per l'inserimento dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.
È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati caricati, consentendo di inizializzare le tabelle senza dichiarare in modo esplicito lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo. Databricks consiglia l'inferenza dello schema quando si utilizza il caricatore automatico. Tuttavia, come illustrato nel passaggio di esplorazione dei dati, i dati dei brani non contengono informazioni sull'intestazione. Poiché l'intestazione non viene archiviata con i dati, è necessario definire in modo esplicito lo schema, come illustrato nell'esempio seguente.
Nella barra laterale, fare clic su
Nuovo e selezionare Notebook dal menu. Viene visualizzata la finestra di dialogo Crea notebook.
Immettere un nome per il notebook, ad esempio
Ingest songs data
. Per impostazione predefinita:- Python è il linguaggio selezionato.
- Il notebook è collegato all'ultima risorsa di calcolo usata. In questo caso, la risorsa che hai creata in Passaggio 1: Creare una risorsa di calcolo.
Immettere quanto segue nella prima cella del notebook:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define variables used in the code below file_path = "/databricks-datasets/songs/data-001/" table_name = "<table-name>" checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data" schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path) .writeStream .option("checkpointLocation", checkpoint_path) .trigger(availableNow=True) .toTable(table_name) )
Se si usa il catalogo Unity, sostituire
<table-name>
con un catalogo, uno schema e un nome di tabella per contenere i record inseriti, ad esempiodata_pipelines.songs_data.raw_song_data
. In caso contrario, sostituire<table-name>
con il nome di una tabella per contenere i record inseriti, ad esempioraw_song_data
.Sostituire
<checkpoint-path>
con un percorso di una directory in DBFS per gestire i file di checkpoint, ad esempio/tmp/pipeline_get_started/_checkpoint/song_data
.Fare clic su
e selezionare Esegui cella. Questo esempio definisce lo schema dei dati usando le informazioni di
README
, inserisce i dati dei brani da tutti i file contenuti infile_path
e scrive i dati nella tabella specificata datable_name
.
Passaggio 4: Preparare i dati non elaborati
Per preparare i dati non elaborati per l'analisi, i passaggi seguenti trasformano i dati dei brani non elaborati filtrando le colonne non necessarie e aggiungendo un nuovo campo contenente un timestamp per la creazione del nuovo record.
Nella barra laterale, fare clic su
Nuovo e selezionare Notebook dal menu. Viene visualizzata la finestra di dialogo Crea notebook.
Immettere un nome per il Notebook. Ad esempio:
Prepare songs data
. Modificare la lingua predefinita in SQL.Nella prima cella del notebook immettere quanto segue:
CREATE OR REPLACE TABLE <table-name> ( artist_id STRING, artist_name STRING, duration DOUBLE, release STRING, tempo DOUBLE, time_signature DOUBLE, title STRING, year DOUBLE, processed_time TIMESTAMP ); INSERT INTO <table-name> SELECT artist_id, artist_name, duration, release, tempo, time_signature, title, year, current_timestamp() FROM <raw-songs-table-name>
Se si usa il catalogo Unity, sostituire
<table-name>
con un catalogo, uno schema e un nome di tabella per contenere i record filtrati e trasformati, ad esempiodata_pipelines.songs_data.prepared_song_data
. In caso contrario, sostituire<table-name>
con il nome di una tabella per contenere i record filtrati e trasformati, ad esempioprepared_song_data
.Sostituire
<raw-songs-table-name>
con il nome della tabella contenente le registrazioni grezze dei brani inserite nel passaggio precedente.Fare clic su
e selezionare Esegui cella.
Passaggio 5: Eseguire una query sui dati trasformati
In questo passaggio si estende la pipeline di elaborazione aggiungendo query per analizzare i dati dei brani. Queste query usano i record preparati creati nel passaggio precedente.
Nella barra laterale, fare clic su
Nuovo e selezionare Notebook dal menu. Viene visualizzata la finestra di dialogo Crea notebook.
Immettere un nome per il Notebook. Ad esempio:
Analyze songs data
. Modificare la lingua predefinita in SQL.Nella prima cella del notebook immettere quanto segue:
-- Which artists released the most songs each year? SELECT artist_name, count(artist_name) AS num_songs, year FROM <prepared-songs-table-name> WHERE year > 0 GROUP BY artist_name, year ORDER BY num_songs DESC, year DESC
Sostituire
<prepared-songs-table-name>
con il nome della tabella contenente i dati preparati. Ad esempio:data_pipelines.songs_data.prepared_song_data
.Fare clic su
nel menu azioni cella, selezionare Aggiungi cella sottostante e inserisci quanto segue nella nuova cella:
-- Find songs for your DJ list SELECT artist_name, title, tempo FROM <prepared-songs-table-name> WHERE time_signature = 4 AND tempo between 100 and 140;
Sostituire
<prepared-songs-table-name>
con il nome della tabella preparata creata nel passaggio precedente. Ad esempio:data_pipelines.songs_data.prepared_song_data
.Per eseguire le query e visualizzare l'output, fare clic su Esegui tutto.
Passaggio 6: Creare un processo di Azure Databricks per eseguire la pipeline
È possibile creare un flusso di lavoro per automatizzare l'esecuzione dei passaggi di inserimento, elaborazione e analisi dei dati usando un processo di Azure Databricks.
- Nell'area di lavoro Data Science & Engineering eseguire una delle operazioni seguenti:
- Fare clic su
Flussi di lavoro nella barra laterale e fare clic su
.
- Nella barra laterale, fai clic su
Nuovo e seleziona Lavoro.
- Fare clic su
- Nella finestra di dialogo della scheda Attività, sostituire Aggiungi un nome per il tuo lavoro... con il nome del tuo lavoro. Ad esempio, "Flusso di lavoro di canzoni".
- In Nome attività immettere un nome per la prima attività, ad esempio
Ingest_songs_data
. - In Tipo selezionare il tipo di attività Notebook .
- In Origine selezionare Area di lavoro.
- Nel campo Percorso, usare il gestore di file per trovare il notebook di inserimento dati, quindi fare clic su Conferma.
- In Calcolo, seleziona la risorsa di calcolo che hai creato nella fase
Create a compute resource
. - Cliccare su Crea.
- Fare clic su
sotto l'attività appena creata e selezionare Notebook.
- In Nome attività immettere un nome per l'attività, ad esempio
Prepare_songs_data
. - In Tipo selezionare il tipo di attività Notebook .
- In Origine selezionare Area di lavoro.
- Usare il browser file per trovare il notebook di preparazione dei dati, fare clic sul nome del notebook e poi su Conferma.
- In Calcolo, selezionare la risorsa di calcolo creata nel passaggio
Create a compute resource
. - Cliccare su Crea.
- Clicca su
sotto l'appena creata attività e seleziona Notebook.
- In Nome attività immettere un nome per l'attività, ad esempio
Analyze_songs_data
. - In Tipo selezionare il tipo di attività Notebook .
- In Origine selezionare Area di lavoro.
- Usare il browser file per trovare il notebook di analisi dei dati, fare clic sul nome del notebook e poi su Conferma.
- In Calcolo, seleziona la risorsa di calcolo che hai creato nel passaggio
Create a compute resource
. - Cliccare su Crea.
- Fare clic su
per eseguire il flusso di lavoro. Per visualizzare i dettagli per l'esecuzione, fare clic sul collegamento nella colonna Ora di inizio per l'esecuzione nella visualizzazione esecuzioni del processo . Fare clic su ogni attività per visualizzare i dettagli per l'esecuzione dell'attività.
- Per visualizzare i risultati al termine del flusso di lavoro, fare clic sull'attività di analisi finale dei dati. Viene visualizzata la pagina Output con i risultati della query.
Passaggio 7: Pianificare il task della pipeline di dati
Nota
Per illustrare l'uso di un processo di Azure Databricks per orchestrare un flusso di lavoro pianificato, questo esempio introduttivo separa i passaggi di inserimento, preparazione e analisi in notebook separati e ogni notebook viene quindi usato per creare un'attività nel processo. Se tutta l'elaborazione è contenuta in un singolo notebook, è possibile pianificare facilmente il notebook direttamente dall'interfaccia utente del notebook di Azure Databricks. Vedere Creare e gestire processi notebook pianificati.
Un requisito comune consiste nell'eseguire una pipeline di dati su base pianificata. Per definire una pianificazione per l'attività che esegue la pipeline:
- Fare clic su
Flussi di lavoro nella barra laterale.
- Nella colonna Nome, fare clic sul nome del lavoro. Nel pannello laterale sono visibili i dettagli dell'incarico.
- Fare clic su Aggiungi trigger nel pannello Dettagli del processo e selezionare Pianificato in Tipo di trigger.
- Specificare il periodo, l'ora di inizio e il fuso orario. Selezionare facoltativamente la casella di controllo Mostra sintassi Cron per visualizzare e modificare il programma in Sintassi Cron Quartz.
- Fare clic su Salva.
Altre informazioni
- Per altre informazioni sui notebook di Databricks, vedere Introduzione ai notebook di Databricks.
- Per altre informazioni sui processi di Azure Databricks, vedere Che cosa sono i processi?.
- Per altre informazioni su Delta Lake, vedere Che cos'è Delta Lake?.
- Per altre informazioni sulle pipeline di elaborazione dati con DLT, vedere Che cos'è DLT?.