你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

使用工作流编排管理器运行现有管道

适用于:Azure 数据工厂 Azure Synapse Analytics

提示

试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用

注意

工作流编排管理器由 Apache Airflow 提供支持。

注意

Azure 数据工厂的工作流编排管理器依赖于开源 Apache Airflow 应用程序。 可以在 Apache Airflow 文档社区页上找到有关 Airflow 的文档和更多教程。

数据工厂管道提供 100 多个数据源连接器,这些连接器提供可缩放且可靠的数据集成/数据流。 在某些情况下,你想要从 Apache Airflow DAG 运行现有的数据工厂管道。 本教程将展示如何做到这一点。

先决条件

  • Azure 订阅。 如果还没有 Azure 订阅,可以在开始前创建一个免费 Azure 帐户
  • Azure 存储帐户。 如果没有存储帐户,请参阅创建 Azure 存储帐户以获取创建步骤。 确保存储帐户仅允许来自选定的网络的访问。
  • Azure 数据工厂管道。 可以按照任何这些教程新建一个数据工厂管道(如果还没有),或者在开始试用第一个数据工厂管道中一键选择创建一个管道。
  • 设置服务主体。 需要创建一个新的服务主体或使用现有服务主体,并授予其运行管道的权限(例如,现有管道所在的数据工厂中的参与者角色),即使工作流编排管理器环境和管道存在于同一数据工厂中也是如此。 需要获取服务主体的客户端 ID 和客户端密码(API 密钥)。

步骤

  1. 使用以下内容新建一个 Python 文件 adf.py:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    必须使用工作流编排管理器 UI“管理员”->“连接”->“+”-> 选择“连接类型”作为“Azure 数据工厂”创建连接,然后填写 client_id、client_secret、tenant_id、subscription_id、resource_group_name、data_factory_name 和 pipeline_name

  2. 将 adf.py 文件上传到名为 DAGS 的文件夹中的 blob 存储。

  3. 将 DAGS 文件夹导入工作流编排管理器环境。 如果没有,请新建一个

    显示“数据工厂管理”选项卡的屏幕截图,其中选择了“Airflow”部分。