Condividi tramite


Crea una pipeline di analisi end-to-end in Databricks

Questo articolo illustra come creare e distribuire una pipeline di elaborazione dati end-to-end, tra cui come inserire dati non elaborati, trasformare i dati ed eseguire analisi sui dati elaborati.

Nota

Anche se questo articolo illustra come creare una pipeline di dati completa usando Databricks notebook e un processo di Azure Databricks per orchestrare un flusso di lavoro, Databricks consiglia di usare DLT, un'interfaccia dichiarativa per la creazione di pipeline di elaborazione dati affidabili, gestibili e testabili.

Che cos'è una pipeline di dati?

Una pipeline di dati implementa i passaggi necessari per spostare i dati dai sistemi di origine, trasformare i dati in base ai requisiti e archiviare i dati in un sistema di destinazione. Una pipeline di dati include tutti i processi necessari per trasformare i dati non elaborati in dati preparati che gli utenti possono utilizzare. Ad esempio, una pipeline di dati potrebbe preparare i dati in modo che gli analisti dei dati e i data scientist possano estrarre valore da essi tramite analisi e creazione di report.

Un flusso di lavoro di estrazione, trasformazione e caricamento (ETL) è un esempio comune di una pipeline di dati. Nell'elaborazione ETL i dati vengono inseriti dai sistemi di origine e scritti in un'area di gestione temporanea, trasformati in base ai requisiti (garantire la qualità dei dati, deduplicare i record e così via) e quindi scritti in un sistema di destinazione, ad esempio un data warehouse o un data lake.

Passaggi del flusso di dati

Per iniziare a creare pipeline di dati in Azure Databricks, l'esempio incluso in questo articolo illustra come creare un flusso di lavoro di elaborazione dati:

  • Usare le funzionalità di Azure Databricks per esplorare un set di dati non elaborato.
  • Creare un notebook di Databricks per inserire dati di origine non elaborati e scrivere i dati non elaborati in una tabella di destinazione.
  • Creare un notebook di Databricks per trasformare i dati di origine non elaborati e scrivere i dati trasformati in una tabella di destinazione.
  • Creare un notebook di Databricks per eseguire una query sui dati trasformati.
  • Automatizzare la pipeline di dati con un job di Azure Databricks.

Requisiti

Esempio: Set di dati Million Song

Il set di dati usato in questo esempio è un subset di Million Song Dataset, una raccolta di caratteristiche e metadati per le tracce musicali contemporanee. Questo set di dati è disponibile nei set di dati di esempio inclusi nell'area di lavoro di Azure Databricks.

Passaggio 1: Creare una risorsa di calcolo

Per eseguire l'elaborazione e l'analisi dei dati in questo esempio, creare una risorsa di calcolo per eseguire i comandi.

Nota

Poiché questo esempio usa un set di dati di esempio archiviato in DBFS e consiglia di rendere persistenti le tabelle per Catalogo Unity, si crea una risorsa di calcolo configurata con modalità di accesso dedicato. La modalità di accesso dedicato consente l'accesso completo a DBFS, consentendo anche l'accesso a Unity Catalog. Vedere Procedure consigliate per DBFS e il catalogo Unity.

  1. Nella barra laterale fare clic su Ambiente di calcolo.
  2. Nella pagina Calcolo, fare clic su Crea calcolo.
  3. Nella nuova pagina di calcolo immettere un nome univoco per la risorsa di calcolo.
  4. In Advancedimpostare la modalità di accesso su Manuale, quindi selezionare Dedicato.
  5. In singolo utente o gruppo, seleziona il tuo nome utente.
  6. Lasciare i valori rimanenti nello stato predefinito e fare clic su Crea.

Per altre informazioni sulle risorse di calcolo di Databricks, vedere Compute.

Passaggio 2: Esplorare i dati di origine

Per informazioni su come usare l'interfaccia di Azure Databricks per esplorare i dati di origine non elaborati, vedere Esplorare i dati di origine per una pipeline di dati. Se si vuole passare direttamente all'inserimento e alla preparazione dei dati, continuare con il Passaggio 3: Inserire i dati non elaborati.

Passaggio 3: Inserire i dati non elaborati

In questo passaggio i dati non elaborati vengono caricati in una tabella per renderli disponibili per un'ulteriore elaborazione. Per gestire gli asset di dati nella piattaforma Databricks, ad esempio le tabelle, Databricks consiglia Unity Catalog. Tuttavia, se non si dispone delle autorizzazioni per creare il catalogo e lo schema necessari per pubblicare le tabelle nel catalogo Unity, è comunque possibile completare i passaggi seguenti pubblicando le tabelle nel metastore Hive.

Databricks consiglia di usare il caricatore automatico per l'inserimento dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.

È possibile configurare il caricatore automatico per rilevare automaticamente lo schema dei dati caricati, consentendo di inizializzare le tabelle senza dichiarare in modo esplicito lo schema dei dati ed evolvere lo schema della tabella man mano che vengono introdotte nuove colonne. In questo modo si elimina la necessità di tenere traccia e applicare manualmente le modifiche dello schema nel tempo. Databricks consiglia l'inferenza dello schema quando si utilizza il caricatore automatico. Tuttavia, come illustrato nel passaggio di esplorazione dei dati, i dati dei brani non contengono informazioni sull'intestazione. Poiché l'intestazione non viene archiviata con i dati, è necessario definire in modo esplicito lo schema, come illustrato nell'esempio seguente.

  1. Nella barra laterale, fare clic su Nuova iconaNuovo e selezionare Notebook dal menu. Viene visualizzata la finestra di dialogo Crea notebook.

  2. Immettere un nome per il notebook, ad esempio Ingest songs data. Per impostazione predefinita:

  3. Immettere quanto segue nella prima cella del notebook:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Se si usa il catalogo Unity, sostituire <table-name> con un catalogo, uno schema e un nome di tabella per contenere i record inseriti, ad esempio data_pipelines.songs_data.raw_song_data. In caso contrario, sostituire <table-name> con il nome di una tabella per contenere i record inseriti, ad esempio raw_song_data.

    Sostituire <checkpoint-path> con un percorso di una directory in DBFS per gestire i file di checkpoint, ad esempio /tmp/pipeline_get_started/_checkpoint/song_data.

  4. Fare clic su Esegui Menu e selezionare Esegui cella. Questo esempio definisce lo schema dei dati usando le informazioni di README, inserisce i dati dei brani da tutti i file contenuti in file_path e scrive i dati nella tabella specificata da table_name.

Passaggio 4: Preparare i dati non elaborati

Per preparare i dati non elaborati per l'analisi, i passaggi seguenti trasformano i dati dei brani non elaborati filtrando le colonne non necessarie e aggiungendo un nuovo campo contenente un timestamp per la creazione del nuovo record.

  1. Nella barra laterale, fare clic su Nuova iconaNuovo e selezionare Notebook dal menu. Viene visualizzata la finestra di dialogo Crea notebook.

  2. Immettere un nome per il Notebook. Ad esempio: Prepare songs data. Modificare la lingua predefinita in SQL.

  3. Nella prima cella del notebook immettere quanto segue:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Se si usa il catalogo Unity, sostituire <table-name> con un catalogo, uno schema e un nome di tabella per contenere i record filtrati e trasformati, ad esempio data_pipelines.songs_data.prepared_song_data. In caso contrario, sostituire <table-name> con il nome di una tabella per contenere i record filtrati e trasformati, ad esempio prepared_song_data.

    Sostituire <raw-songs-table-name> con il nome della tabella contenente le registrazioni grezze dei brani inserite nel passaggio precedente.

  4. Fare clic su Esegui Menu e selezionare Esegui cella.

Passaggio 5: Eseguire una query sui dati trasformati

In questo passaggio si estende la pipeline di elaborazione aggiungendo query per analizzare i dati dei brani. Queste query usano i record preparati creati nel passaggio precedente.

  1. Nella barra laterale, fare clic su Nuova iconaNuovo e selezionare Notebook dal menu. Viene visualizzata la finestra di dialogo Crea notebook.

  2. Immettere un nome per il Notebook. Ad esempio: Analyze songs data. Modificare la lingua predefinita in SQL.

  3. Nella prima cella del notebook immettere quanto segue:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    Sostituire <prepared-songs-table-name> con il nome della tabella contenente i dati preparati. Ad esempio: data_pipelines.songs_data.prepared_song_data.

  4. Fare clic su Freccia giù nel menu azioni cella, selezionare Aggiungi cella sottostante e inserisci quanto segue nella nuova cella:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    Sostituire <prepared-songs-table-name> con il nome della tabella preparata creata nel passaggio precedente. Ad esempio: data_pipelines.songs_data.prepared_song_data.

  5. Per eseguire le query e visualizzare l'output, fare clic su Esegui tutto.

Passaggio 6: Creare un processo di Azure Databricks per eseguire la pipeline

È possibile creare un flusso di lavoro per automatizzare l'esecuzione dei passaggi di inserimento, elaborazione e analisi dei dati usando un processo di Azure Databricks.

  1. Nell'area di lavoro Data Science & Engineering eseguire una delle operazioni seguenti:
    • Fare clic su Icona Flussi di lavoroFlussi di lavoro nella barra laterale e fare clic su Pulsante Crea attività.
    • Nella barra laterale, fai clic su Nuova iconaNuovo e seleziona Lavoro.
  2. Nella finestra di dialogo della scheda Attività, sostituire Aggiungi un nome per il tuo lavoro... con il nome del tuo lavoro. Ad esempio, "Flusso di lavoro di canzoni".
  3. In Nome attività immettere un nome per la prima attività, ad esempio Ingest_songs_data.
  4. In Tipo selezionare il tipo di attività Notebook .
  5. In Origine selezionare Area di lavoro.
  6. Nel campo Percorso, usare il gestore di file per trovare il notebook di inserimento dati, quindi fare clic su Conferma.
  7. In Calcolo, seleziona la risorsa di calcolo che hai creato nella fase Create a compute resource.
  8. Cliccare su Crea.
  9. Fare clic su Pulsante Aggiungi attività sotto l'attività appena creata e selezionare Notebook.
  10. In Nome attività immettere un nome per l'attività, ad esempio Prepare_songs_data.
  11. In Tipo selezionare il tipo di attività Notebook .
  12. In Origine selezionare Area di lavoro.
  13. Usare il browser file per trovare il notebook di preparazione dei dati, fare clic sul nome del notebook e poi su Conferma.
  14. In Calcolo, selezionare la risorsa di calcolo creata nel passaggio Create a compute resource.
  15. Cliccare su Crea.
  16. Clicca su Pulsante Aggiungi attività sotto l'appena creata attività e seleziona Notebook.
  17. In Nome attività immettere un nome per l'attività, ad esempio Analyze_songs_data.
  18. In Tipo selezionare il tipo di attività Notebook .
  19. In Origine selezionare Area di lavoro.
  20. Usare il browser file per trovare il notebook di analisi dei dati, fare clic sul nome del notebook e poi su Conferma.
  21. In Calcolo, seleziona la risorsa di calcolo che hai creato nel passaggio Create a compute resource.
  22. Cliccare su Crea.
  23. Fare clic su pulsante per eseguire il flusso di lavoro. Per visualizzare i dettagli per l'esecuzione, fare clic sul collegamento nella colonna Ora di inizio per l'esecuzione nella visualizzazione esecuzioni del processo . Fare clic su ogni attività per visualizzare i dettagli per l'esecuzione dell'attività.
  24. Per visualizzare i risultati al termine del flusso di lavoro, fare clic sull'attività di analisi finale dei dati. Viene visualizzata la pagina Output con i risultati della query.

Passaggio 7: Pianificare il task della pipeline di dati

Nota

Per illustrare l'uso di un processo di Azure Databricks per orchestrare un flusso di lavoro pianificato, questo esempio introduttivo separa i passaggi di inserimento, preparazione e analisi in notebook separati e ogni notebook viene quindi usato per creare un'attività nel processo. Se tutta l'elaborazione è contenuta in un singolo notebook, è possibile pianificare facilmente il notebook direttamente dall'interfaccia utente del notebook di Azure Databricks. Vedere Creare e gestire processi notebook pianificati.

Un requisito comune consiste nell'eseguire una pipeline di dati su base pianificata. Per definire una pianificazione per l'attività che esegue la pipeline:

  1. Fare clic su Icona Flussi di lavoroFlussi di lavoro nella barra laterale.
  2. Nella colonna Nome, fare clic sul nome del lavoro. Nel pannello laterale sono visibili i dettagli dell'incarico.
  3. Fare clic su Aggiungi trigger nel pannello Dettagli del processo e selezionare Pianificato in Tipo di trigger.
  4. Specificare il periodo, l'ora di inizio e il fuso orario. Selezionare facoltativamente la casella di controllo Mostra sintassi Cron per visualizzare e modificare il programma in Sintassi Cron Quartz.
  5. Fare clic su Salva.

Altre informazioni