Köra en befintlig pipeline med Workflow Orchestration Manager
GÄLLER FÖR: Azure Data Factory Azure Synapse Analytics
Dricks
Prova Data Factory i Microsoft Fabric, en allt-i-ett-analyslösning för företag. Microsoft Fabric omfattar allt från dataflytt till datavetenskap, realtidsanalys, business intelligence och rapportering. Lär dig hur du startar en ny utvärderingsversion kostnadsfritt!
Kommentar
Workflow Orchestration Manager drivs av Apache Airflow.
Kommentar
Workflow Orchestration Manager för Azure Data Factory förlitar sig på öppen källkod Apache Airflow-programmet. Dokumentation och fler självstudier för Airflow finns på Apache Airflow-dokumentationen eller community-sidorna.
Data Factory-pipelines tillhandahåller över 100 anslutningsappar för datakällor som ger skalbar och tillförlitlig dataintegrering/dataflöden. Det finns scenarier där du vill köra en befintlig datafabrikspipeline från Apache Airflow DAG. Den här självstudien visar hur du gör just det.
Förutsättningar
- Azure-prenumeration. Om du inte har en Azure-prenumeration kan du skapa ett kostnadsfritt Azure-konto innan du börjar.
- Azure Storage-konto. Om du inte har ett lagringskonto finns det anvisningar om hur du skapar ett i Skapa ett Azure Storage-konto. Kontrollera att lagringskontot endast tillåter åtkomst från valda nätverk.
- Azure Data Factory-pipeline. Du kan följa någon av självstudierna och skapa en ny datafabrikspipeline om du inte redan har en, eller skapa en med ett val i Kom igång och prova din första datafabrikspipeline.
- Konfigurera ett huvudnamn för tjänsten. Du måste skapa ett nytt huvudnamn för tjänsten eller använda ett befintligt och ge det behörighet att köra pipelinen (till exempel deltagarrollen i datafabriken där befintliga pipelines finns), även om arbetsflödesorkestreringshanterarens miljö och pipelines finns i samma datafabrik. Du måste hämta tjänstens huvudnamns klient-ID och klienthemlighet (API-nyckel).
Steg
Skapa en ny Python-fil adf.py med innehållet nedan:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
Du måste skapa anslutningen med hjälp av UI-administratören för Arbetsflödesorkestreringshanteraren –> Anslutningar –> "+" –> Välj "Anslutningstyp" som "Azure Data Factory" och fyll sedan i din client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name och pipeline_name.
Ladda upp adf.py-filen till bloblagringen i en mapp med namnet DAGS.
Importera MAPPEN DAGS till arbetsflödesorkestreringshanterarens miljö. Om du inte har en skapar du en ny