แก้ไข

แชร์ผ่าน


Run an existing pipeline with Workflow Orchestration Manager

APPLIES TO: Azure Data Factory Azure Synapse Analytics

Tip

Try out Data Factory in Microsoft Fabric, an all-in-one analytics solution for enterprises. Microsoft Fabric covers everything from data movement to data science, real-time analytics, business intelligence, and reporting. Learn how to start a new trial for free!

Note

Workflow Orchestration Manager is powered by Apache Airflow.

Note

Workflow Orchestration Manager for Azure Data Factory relies on the open source Apache Airflow application. Documentation and more tutorials for Airflow can be found on the Apache Airflow Documentation or Community pages.

Data Factory pipelines provide 100+ data source connectors that provide scalable and reliable data integration/ data flows. There are scenarios where you would like to run an existing data factory pipeline from your Apache Airflow DAG. This tutorial shows you how to do just that.

Prerequisites

  • Azure subscription. If you don't have an Azure subscription, create a free Azure account before you begin.
  • Azure storage account. If you don't have a storage account, see Create an Azure storage account for steps to create one. Ensure the storage account allows access only from selected networks.
  • Azure Data Factory pipeline. You can follow any of the tutorials and create a new data factory pipeline in case you don't already have one, or create one with one select in Get started and try out your first data factory pipeline.
  • Setup a Service Principal. You'll need to create a new service principal or use an existing one and grant it permission to run the pipeline (example - contributor role in the data factory where the existing pipelines exist), even if the Workflow Orchestration Manager environment and the pipelines exist in the same data factory. You'll need to get the Service Principal’s Client ID and Client Secret (API Key).

Steps

  1. Create a new Python file adf.py with the below contents:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    You'll have to create the connection using the Workflow Orchestration Manager UI Admin -> Connections -> '+' -> Choose 'Connection type' as 'Azure Data Factory', then fill in your client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name, and pipeline_name.

  2. Upload the adf.py file to your blob storage within a folder called DAGS.

  3. Import the DAGS folder into your Workflow Orchestration Manager environment. If you don't have one, create a new one

    Screenshot showing the data factory management tab with the Airflow section selected.