Köra en Delta Live Tables-pipeline i ett arbetsflöde
Du kan köra en Delta Live Tables-pipeline som en del av ett arbetsflöde för databearbetning med Databricks-jobb, Apache Airflow eller Azure Data Factory.
Projekt
Du kan orkestrera flera uppgifter i ett Databricks-jobb för att implementera ett arbetsflöde för databearbetning. Om du vill inkludera en Delta Live Tables-pipeline i ett jobb använder du uppgiften Pipeline när du skapar ett jobb. Se uppgift för Delta Live Tables-pipeline för jobb.
Apache Airflow
Apache Airflow är en öppen källkod lösning för att hantera och schemalägga dataarbetsflöden. Airflow representerar arbetsflöden som riktade acykliska grafer (DAG) för åtgärder. Du definierar ett arbetsflöde i en Python-fil och Airflow hanterar schemaläggning och körning. Information om hur du installerar och använder Airflow med Azure Databricks finns i Orchestrate Azure Databricks-jobb med Apache Airflow.
Om du vill köra en Delta Live Tables-pipeline som en del av ett Airflow-arbetsflöde använder du DatabricksSubmitRunOperator.
Krav
Följande krävs för att använda Airflow-stöd för Delta Live Tables:
- Airflow version 2.1.0 eller senare.
- Databricks-providerpaketet version 2.1.0 eller senare.
Exempel
I följande exempel skapas en Airflow DAG som utlöser en uppdatering för Delta Live Tables-pipelinen med identifieraren 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Ersätt CONNECTION_ID
med identifieraren för en Airflow-anslutning till din arbetsyta.
Spara det här exemplet i airflow/dags
katalogen och använd airflow-användargränssnittet för att visa och utlösa DAG. Använd Delta Live Tables-användargränssnittet för att visa information om pipelineuppdateringen.
Azure Data Factory
Kommentar
Delta Live Tables och Azure Data Factory innehåller alternativ för att konfigurera antalet återförsök när ett fel inträffar. Om återförsöksvärdena har konfigurerats på din Delta Live Tables-pipeline och i Azure Data Factory-aktiviteten som anropar pipelinen, beräknas antalet återförsök som återförsöksvärdet i Azure Data Factory multiplicerat med återförsöksvärdet för Delta Live Tables.
Om en pipelineuppdatering till exempel misslyckas försöker Delta Live Tables uppdatera igen upp till fem gånger som standard. Om Azure Data Factory-återförsöket är inställt på tre och din Delta Live Tables-pipeline använder standardvärdet fem återförsök, kan din misslyckade Delta Live Tables-pipeline försöka igen upp till femton gånger. För att undvika alltför stora återförsök när pipelineuppdateringar misslyckas rekommenderar Databricks att du begränsar antalet återförsök när du konfigurerar Delta Live Tables-pipelinen eller Azure Data Factory-aktiviteten som anropar pipelinen.
Om du vill ändra konfigurationen för återförsök för din Delta Live Tables-pipeline använder du inställningen pipelines.numUpdateRetryAttempts
när du konfigurerar pipelinen.
Azure Data Factory är en molnbaserad ETL-tjänst som låter dig samordna arbetsflöden för dataintegrering och transformering. Azure Data Factory har direkt stöd för att köra Azure Databricks-uppgifter i ett arbetsflöde, inklusive notebook-filer, JAR-uppgifter och Python-skript. Du kan också inkludera en pipeline i ett arbetsflöde genom att anropa Delta Live Tables API från en Azure Data Factory-webbaktivitet. Om du till exempel vill utlösa en pipelineuppdatering från Azure Data Factory:
Skapa en datafabrik eller öppna en befintlig datafabrik.
När skapandet är klart öppnar du sidan för datafabriken och klickar på panelen Öppna Azure Data Factory Studio . Användargränssnittet för Azure Data Factory visas.
Skapa en ny Azure Data Factory-pipeline genom att välja Pipeline i den nedrullningsbara menyn Ny i Användargränssnittet för Azure Data Factory Studio.
I verktygslådan Aktiviteter expanderar du Allmänt och drar webbaktiviteten till pipelinearbetsytan. Klicka på fliken Inställningar och ange följande värden:
Kommentar
När du autentiserar med automatiserade verktyg, system, skript och appar rekommenderar Databricks att du använder personliga åtkomsttoken som tillhör tjänstens huvudnamn i stället för arbetsyteanvändare. Information om hur du skapar token för tjänstens huvudnamn finns i Hantera token för tjänstens huvudnamn.
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Ersätt
<get-workspace-instance>
.Ersätt
<pipeline-id>
med pipelineidentifieraren.Metod: Välj POST- i den nedrullningsbara menyn.
Rubriker: Klicka på + Ny. I textrutan Namn anger du
Authorization
. I textrutan Värde anger duBearer <personal-access-token>
.Ersätt
<personal-access-token>
med en personlig åtkomsttoken för Azure Databricks.Body: Om du vill skicka ytterligare parametrar för begäran anger du ett JSON-dokument som innehåller parametrarna. Om du till exempel vill starta en uppdatering och omarbeta alla data för pipelinen:
{"full_refresh": "true"}
. Om det inte finns några ytterligare parametrar för begäran anger du tomma klammerparenteser ({}
).
Om du vill testa webbaktiviteten klickar du på Felsöka i pipelineverktygsfältet i Data Factory-användargränssnittet. Körningens utdata och status, inklusive fel, visas på fliken Utdata i Azure Data Factory-pipelinen. Använd Delta Live Tables-användargränssnittet för att visa information om pipelineuppdateringen.
Dricks
Ett vanligt krav för arbetsflödet är att starta en uppgift efter att en tidigare aktivitet har slutförts. Eftersom Delta Live Tables-updates
begäran är asynkron – begäran returneras när uppdateringen har startats men innan uppdateringen är klar – måste uppgifter i Azure Data Factory-pipelinen med ett beroende av Delta Live Tables-uppdateringen vänta tills uppdateringen har slutförts. Ett alternativ för att vänta tills uppdateringen har slutförts är att lägga till aktiviteten Tills efter den webbaktivitet som utlöser uppdateringen av Delta Live Tables. I aktiviteten Till:
- Lägg till en vänta-aktivitet för att vänta ett konfigurerat antal sekunder för att uppdateringen ska slutföras.
- Lägg till en webbaktivitet efter wait-aktiviteten som använder begäran om uppdateringsinformation för Delta Live Tables för att hämta status för uppdateringen. Fältet
state
i svaret returnerar uppdateringens aktuella tillstånd, inklusive om den har slutförts. - Använd värdet för fältet
state
för att ange avslutande villkor för aktiviteten Till. Du kan också använda en Ange variabelaktivitet för att lägga till en pipeline-variabel baserat på värdetstate
och använda den här variabeln för avslutsvillkoret.