Eseguire una pipeline Delta Live Tables in un flusso di lavoro.
È possibile eseguire una pipeline di tabelle live Delta come parte di un flusso di lavoro di elaborazione dati con processi di Databricks, Apache Airflow o Azure Data Factory.
Processi
È possibile orchestrare più attività in un processo di Databricks per implementare un flusso di lavoro di elaborazione dati. Per includere una pipeline Delta Live Tables in un processo, utilizzare l'attività Pipeline quando si crea un processo. Per il processo, vedere il compito della pipeline Tabelle Live Delta
Apache Airflow
Apache Airflow è una soluzione open source per la gestione e la pianificazione dei flussi di lavoro di dati. Airflow rappresenta i flussi di lavoro come grafi aciclici diretti (DAG) di operazioni. Si definisce un flusso di lavoro in un file Python e Airflow ne gestisce la programmazione e l'esecuzione. Per informazioni sull'installazione e l'uso di Airflow con Azure Databricks, vedere Orchestrare i processi di Azure Databricks con Apache Airflow.
Per eseguire una pipeline Delta Live Tables come parte di un flusso di lavoro Airflow, usare il DatabricksSubmitRunOperator.
Requisiti
Per usare il supporto di Airflow per Delta Live Tables, è necessario quanto segue:
- Airflow versione 2.1.0 o successive.
- Pacchetto del provider Databricks versione 2.1.0 o successiva.
Esempio
Nell'esempio seguente viene creato un DAG Airflow che attiva un aggiornamento per la pipeline di Delta Live Tables con l'identificatore 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Sostituire CONNECTION_ID
con l'identificatore per una connessione Airflow alla propria area di lavoro.
Salvare questo esempio nella directory airflow/dags
e usare l'interfaccia utente Airflow per visualizzare e attivare il DAG. Utilizzare la UI di Delta Live Tables per visualizzare i dettagli dell'aggiornamento della pipeline.
Azure Data Factory
Nota
Delta Live Tables e Azure Data Factory includono opzioni per configurare il numero di tentativi quando si verifica un errore. Se i valori di ripetizione dei tentativi sono configurati nella pipeline di tabelle live Delta e nell'attività di Azure Data Factory che chiama la pipeline, il numero di tentativi è il valore di ripetizione dei tentativi di Azure Data Factory moltiplicato per il valore di ripetizione dei tentativi delle tabelle live Delta.
Ad esempio, se un aggiornamento della pipeline ha esito negativo, Delta Live Tables ritenta l'aggiornamento fino a cinque volte di default. Se la ripetizione dei tentativi di Azure Data Factory è impostata su tre e la pipeline di tabelle live Delta usa il valore predefinito di cinque tentativi, la pipeline delta live tables non riuscita potrebbe essere ritentata fino a quindici volte. Per evitare tentativi eccessivi quando gli aggiornamenti della pipeline hanno esito negativo, Databricks consiglia di limitare il numero di tentativi durante la configurazione della pipeline di tabelle live Delta o dell'attività di Azure Data Factory che chiama la pipeline.
Per modificare la configurazione dei ritentativi per la pipeline di tabelle live Delta, utilizza l'impostazione pipelines.numUpdateRetryAttempts
durante la configurazione della pipeline.
Azure Data Factory è un servizio di ETL basato sul cloud che consente di orchestrare i flussi di lavoro di integrazione e trasformazione dei dati. Azure Data Factory supporta direttamente l'esecuzione di attività di Azure Databricks in un flusso di lavoro, inclusi notebook, attività JAR e script Python. È anche possibile includere una pipeline in un flusso di lavoro chiamando l'API Delta Live Tables da un'attività Web di Azure Data Factory . Ad esempio, per attivare un aggiornamento della pipeline da Azure Data Factory:
Creare una data factory o aprire una data factory esistente.
Al termine della creazione, aprire la pagina per la data factory e fare clic sul riquadro Apri Azure Data Factory Studio. Viene visualizzata l'interfaccia utente di Azure Data Factory.
Creare una nuova pipeline di Azure Data Factory selezionando Pipeline dal menu a discesa Nuovo nell'interfaccia utente di Azure Data Factory Studio.
Nella casella degli strumenti Attività espandere Generale e trascinare l'attività Web nell'area di progettazione della pipeline. Fare clic sulla scheda Impostazioni
e immettere i valori seguenti: Nota
Come procedura consigliata per la sicurezza, quando si esegue l'autenticazione con strumenti automatizzati, sistemi, script e app, Databricks consiglia di usare token di accesso personali appartenenti alle entità servizio, anziché agli utenti dell'area di lavoro. Per creare token per le entità servizio, consultare Gestire i token per un'entità servizio.
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Sostituire
<get-workspace-instance>
.Sostituire
<pipeline-id>
con l'identificatore della pipeline.Metodo: selezionare POST dal menu a discesa.
Intestazioni: fare clic su + Nuovo. Nella casella di testo Nome immettere
Authorization
. Nella casella di testo Valore immettereBearer <personal-access-token>
.Sostituire
<personal-access-token>
con un token di accesso personale di Azure Databricks.Body: per passare parametri di richiesta aggiuntivi, immettere un documento JSON contenente i parametri. Ad esempio, per avviare un aggiornamento e rielaborare tutti i dati per la pipeline:
{"full_refresh": "true"}
. Se non sono presenti parametri di richiesta aggiuntivi, immettere parentesi graffe vuote ({}
).
Per testare l'attività Web, fare clic su Debug sulla barra degli strumenti della pipeline nell'interfaccia utente di Data Factory. L'output e lo stato dell'esecuzione, inclusi gli errori, vengono visualizzati nella scheda Output della pipeline di Azure Data Factory. Utilizzare l'interfaccia utente di Delta Live Tables per visualizzare i dettagli dell'aggiornamento della pipeline.
Suggerimento
Un requisito comune del flusso di lavoro consiste nell'avviare un'attività dopo il completamento di un'attività precedente. Poiché la richiesta delta live tables updates
è asincrona, ovvero la richiesta viene restituita dopo l'avvio dell'aggiornamento, ma prima del completamento dell'aggiornamento, le attività nella pipeline di Azure Data Factory con una dipendenza dall'aggiornamento delle tabelle live Delta devono attendere il completamento dell'aggiornamento. Un'opzione per attendere il completamento dell'aggiornamento consiste nell'aggiungere un'attività Until dopo l'attività Web che attiva l'aggiornamento delle tabelle Live Delta. Nell'attività Until:
- Aggiungere un 'attività Wait per attendere un numero di secondi configurato per il completamento dell'aggiornamento.
- Aggiungere un'attività Web dopo l'attività di attesa che utilizza la richiesta di dettagli di aggiornamento delle tabelle live Delta per ottenere lo stato dell'aggiornamento. Il campo
state
nella risposta restituisce lo stato corrente dell'aggiornamento, incluso se è stato completato. - Utilizzare il valore del campo
state
per impostare la condizione di terminazione per l'attività Until. È anche possibile usare un'attività Impostare la Variabile per aggiungere una variabile della pipeline in base al valorestate
e usare questa variabile per la condizione di terminazione.