Orchestrare i processi di Azure Databricks con Apache Airflow
Questo articolo descrive il supporto di Apache Airflow per l'orchestrazione di pipeline di dati con Azure Databricks, contiene istruzioni per l'installazione e la configurazione di Airflow a livello locale e fornisce un esempio di distribuzione ed esecuzione di un workflow di Azure Databricks con Airflow.
Orchestrazione dei processi in una pipeline di dati
Lo sviluppo e la distribuzione di una pipeline di elaborazione dati spesso richiedono la gestione di dipendenze complesse tra le attività. Ad esempio, una pipeline potrebbe leggere i dati da un'origine, pulire i dati, trasformare i dati puliti e scrivere i dati trasformati in una destinazione. È inoltre necessario un supporto per il test, la programmazione e la risoluzione degli errori quando si rende operativa una pipeline.
I sistemi di workflow affrontano queste sfide consentendo di definire le dipendenze tra le attività, di programmare l'esecuzione delle pipeline e di monitorare i flussi di lavoro. Apache Airflow è una soluzione open source per la gestione e la pianificazione delle pipeline di dati. Airflow rappresenta le pipeline di dati come grafi aciclici diretti (DAG) di operazioni. Si definisce un flusso di lavoro in un file Python e Airflow ne gestisce la programmazione e l'esecuzione. La connessione Airflow di Azure Databricks consente di sfruttare il motore Spark ottimizzato offerto da Azure Databricks con le funzionalità di pianificazione di Airflow.
Requisiti
- L'integrazione tra Airflow e Azure Databricks richiede Airflow nella versione 2.5.0 e successive. Gli esempi in questo articolo vengono testati con Airflow nella versione 2.6.1.
- Airflow richiede Python 3.8, 3.9, 3.10, o 3.11. Gli esempi in questo articolo vengono testati con Python 3.8.
- Le istruzioni in questo articolo per installare ed eseguire Airflow richiedono pipenv per creare un ambiente virtuale Python.
Operatori Airflow per Databricks
Un DAG Airflow è composto da attività, dove ogni attività esegue un Operatore Airflow. Gli operatori Airflow che supportano l'integrazione con Databricks vengono implementati nel provider Databricks.
Il provider Databricks comprende operatori per l'esecuzione di una serie di attività in un'area di lavoro di Azure Databricks, tra cui l'importazione di dati in una tabella, l'esecuzione di query SQL e il lavoro con le cartelle Git di Databricks.
Il provider Databricks implementa due operatori per l'attivazione dei processi:
-
DatabricksRunNowOperator richiede un processo di Azure Databricks esistente e usa la richiesta API POST /api/2.1/jobs/run-now per attivare un'esecuzione. Databricks consiglia di usare
DatabricksRunNowOperator
perché riduce la duplicazione delle definizioni dei processi e le esecuzioni dei processi attivate con questo operatore possono essere trovate nell'interfaccia utente dei processi. - DatabricksSubmitRunOperator non richiede l'esistenza di un processo in Azure Databricks e utilizza la richiesta API POST /api/2.1/jobs/runs/submit per inviare le specifiche del processo e attivare un'esecuzione.
Per creare un nuovo processo di Azure Databricks o reimpostare un processo esistente, il provider Databricks implementa DatabricksCreateJobsOperator. Il DatabricksCreateJobsOperator
usa le richieste API POST /api/2.1/jobs/create e POST /api/2.1/jobs/reset. È possibile usare DatabricksCreateJobsOperator
con DatabricksRunNowOperator
per creare ed eseguire un processo.
Nota
Per utilizzare gli operatori di Databricks per attivare un processo è necessario fornire le credenziali nella configurazione della connessione a Databricks. Consultare Creare un token di accesso personale di Azure Databricks per la connessione Airflow.
Gli operatori Databricks Airflow scrivono l'URL della pagina di esecuzione del processo nei log Airflow ogni polling_period_seconds
(il valore predefinito è 30 secondi). Per altre informazioni, consultare la pagina del pacchetto apache-airflow-providers-databricks nel sito Web Airflow.
Installare l'integrazione di Airflow di Azure Databricks in locale
Per installare Airflow e il provider Databricks in locale per il test e lo sviluppo, seguire questa procedura. Per altre opzioni di installazione di Airflow, inclusa la creazione di un'installazione di produzione, consultare installazione nella documentazione di Airflow.
Aprire un terminale ed eseguire i comandi seguenti:
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>
Sostituire <firstname>
, <lastname>
, e <email>
con il nome utente e l'e-mail. Verrà richiesto di inserire una password per l'utente amministratore. Assicurarsi di salvare questa password perché è necessaria per accedere all'interfaccia utente di Airflow.
Lo script esegue quindi i passaggi seguenti:
- Crea una directory denominata
airflow
e cambia in tale directory. - Usa
pipenv
per creare e generare un ambiente virtuale Python. Databricks consiglia di utilizzare un ambiente virtuale Python per isolare le versioni dei pacchetti e le dipendenze del codice in quell'ambiente. Questo isolamento aiuta a ridurre le discrepanze tra le versioni dei pacchetti e le interferenze tra le dipendenze del codice. - Inizializza una variabile di ambiente denominata
AIRFLOW_HOME
impostata sul percorso della directoryairflow
. - Installa Airflow e i pacchetti del provider Airflow Databricks.
- Crea una directory
airflow/dags
. Airflow usa ladags
directory per archiviare le definizioni di DAG. - Inizializza un database SQLite usato da Airflow per tenere traccia dei metadati. In una distribuzione Airflow di produzione, si deve configurare Airflow con un database standard. Il database SQLite e la configurazione predefinita per la distribuzione Airflow vengono inizializzati nella directory
airflow
. - Crea un utente amministratore per Airflow.
Suggerimento
Per confermare l'installazione del provider Databricks, eseguire il seguente comando nella directory di installazione di Airflow:
airflow providers list
Avviare il server Web Airflow e l'utilità di pianificazione
Il server Web Airflow è necessario per visualizzare l'interfaccia utente airflow. Per avviare il server Web, aprire un terminale nella directory di installazione di Airflow ed eseguire i seguenti comandi:
Nota
Se l'avvio del server Web Airflow non riesce a causa di un conflitto di porte, è possibile modificare la porta predefinita nella configurazione di Airflow.
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver
L'utilità di pianificazione è il componente Airflow che pianifica i DAG. Per avviare l’unità di pianificazione, aprire un terminale nella directory di installazione di Airflow ed eseguire i seguenti comandi:
pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler
Test dell'installazione di Airflow
Per verificare l'installazione di Airflow, è possibile eseguire uno dei DAG di esempio inclusi in Airflow:
- In una finestra del browser aprire
http://localhost:8080/home
. Accedere all'interfaccia utente di Airflow con il nome utente e la password creati durante l'installazione di Airflow. Viene visualizzata la pagina DaG di Airflow. - Fare clic sull'interruttore DAG Pausa/Riattiva per annullare la sospensione di uno dei gruppi di disponibilità di esempio, ad esempio
example_python_operator
. - Attivare il DAG di esempio facendo clic sul pulsante Trigger DAG.
- Fare clic sul nome del DAG per visualizzare i dettagli, incluso lo stato di esecuzione del DAG.
Creare un token di accesso personale di Azure Databricks per Airflow
Airflow si connette a Databricks usando un token di accesso personale di Azure Databricks (PAT). Per creare un token di accesso personale personale di Azure Databricks, seguire i passaggi per gli utenti dell'area di lavoro.
Nota
Come procedura consigliata per la sicurezza, quando si esegue l'autenticazione con strumenti automatizzati, sistemi, script e app, Databricks consiglia di usare token di accesso personali appartenenti alle entità servizio, anziché agli utenti dell'area di lavoro. Per creare token per le entità servizio, consultare Gestire i token per un'entità servizio.
È anche possibile eseguire l'autenticazione in Azure Databricks usando un token Microsoft Entra ID. Consultare Connessione a Databricks nella documentazione di Airflow.
Configurare una connessione Azure Databricks
L'installazione di Airflow contiene una connessione predefinita per Azure Databricks. Per aggiornare la connessione per connettersi all'area di lavoro usando il token di accesso personale creato in precedenza:
- In una finestra del browser aprire
http://localhost:8080/connection/list/
. Se viene richiesto di effettuare l'accesso, inserire il nome utente e la password dell'amministratore. - In ID Conn, individuare databricks_default e fare clic sul pulsante Modifica record.
- Sostituire il valore nel campo Host con il nome dell'istanza dell'area di lavoro della distribuzione di Azure Databricks, ad esempio
https://adb-123456789.cloud.databricks.com
. - Nel campo Password immettere il token di accesso personale di Azure Databricks.
- Fare clic su Salva.
Se si usa un token ID Microsoft Entra, consultare Connessione a Databricks nella documentazione di Airflow per informazioni sulla configurazione dell'autenticazione.
Esempio: Creare un DAG airflow per eseguire un processo di Azure Databricks
Il seguente esempio mostra come creare una semplice distribuzione Airflow che viene eseguita sul computer locale e implementa un DAG di esempio per attivare le esecuzioni in Azure Databricks. Nel seguente esempio, si eseguirà quanto segue:
- Creare un nuovo notebook e aggiungere codice per stampare un messaggio di saluto in base a un parametro configurato.
- Creare un processo di Azure Databricks con una singola attività che esegue il notebook.
- Configurare una connessione Airflow all'area di lavoro di Azure Databricks.
- Creare un DAG Airflow per attivare il processo del notebook. Il dag viene definito in uno script Python usando
DatabricksRunNowOperator
. - Usare l'interfaccia utente Airflow per attivare il DAG e visualizzare lo stato dell'esecuzione.
Creare un notebook
Questo esempio usa un notebook contenente due celle:
- La prima cella contiene un widget di testo Databricks Utilities che definisce una variabile denominata
greeting
impostata sul valore predefinitoworld
. - La seconda cella stampa il valore della variabile
greeting
preceduta dahello
.
Per creare il notebook:
Passare all'area di lavoro di Azure Databricks, fare clic su Nuovo nella barra laterale e selezionare Notebook.
Assegnare un nome al notebook, ad esempio Hello Airflow, e assicurarsi che il linguaggio predefinito sia impostato su Python.
Copiare il seguente codice Python e incollarlo nella prima cella del notebook.
dbutils.widgets.text("greeting", "world", "Greeting") greeting = dbutils.widgets.get("greeting")
Aggiungere una nuova cella sotto la prima e copiare e incollare il seguente codice Python nella nuova cella:
print("hello {}".format(greeting))
Creare un processo
Cliccare Flussi di lavoro nella barra laterale.
Cliccare .
Viene visualizzata la scheda Attività con la finestra di dialogo Crea attività.
Sostituire Aggiungi un nome per il processo… con il nome del processo.
Nel campo Nome attività immettere un nome per l'attività, ad esempio greeting-task.
Nel menu a discesa Tipo selezionare Notebook.
Nel menu a discesa Origine, selezionare Area di lavoro.
Fare clic sulla casella di testo Percorso e utilizzare il browser dei file per trovare il notebook creato, fare clic sul nome del notebook e poi su Conferma.
Fare clic su Aggiungi in Parametri. Nel campo Chiave immettere
greeting
. Nel campo Valore immettereAirflow user
.Fare clic su Crea token.
Nel pannello Dettagli processo copiare il valore ID processo. Questo valore è necessario per attivare il processo da Airflow.
Eseguire il processo
Per testare il nuovo processo nell'interfaccia utente dei processi di Azure Databricks, fare clic su nell'angolo in alto a destra. Al termine dell'esecuzione, è possibile verificare l'output visualizzando i dettagli dell'esecuzione del processo.
Creare un nuovo DAG Airflow
Si definisce un DAG Airflow in un file Python. Per creare un DAG per attivare il processo del notebook di esempio:
In un editor di testo o in un IDE, create un nuovo file denominato
databricks_dag.py
con i seguenti contenuti:from airflow import DAG from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow' } with DAG('databricks_dag', start_date = days_ago(2), schedule_interval = None, default_args = default_args ) as dag: opr_run_now = DatabricksRunNowOperator( task_id = 'run_now', databricks_conn_id = 'databricks_default', job_id = JOB_ID )
Sostituire
JOB_ID
con il valore dell'ID processo salvato in precedenza.Salvare questo file nella directory
airflow/dags
. Airflow legge e installa automaticamente i file DAG archiviati inairflow/dags/
.
Installare e verificare il DAG in Airflow
Per attivare e verificare il DAG nell'interfaccia utente Airflow:
- In una finestra del browser aprire
http://localhost:8080/home
. Viene visualizzata la schermata DaG Airflow. - Individuare
databricks_dag
e fare clic sull'interruttore DAG Pausa/Riattiva per rimuovere il DAG. - Attivare il DAG facendo clic sul pulsante Trigger DAG.
- Fare clic su un'esecuzione nella colonna Esecuzioni per visualizzare lo stato e i dettagli dell'esecuzione.