你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用工作流编排管理器运行现有管道
适用于:Azure 数据工厂 Azure Synapse Analytics
提示
试用 Microsoft Fabric 中的数据工厂,这是一种适用于企业的一站式分析解决方案。 Microsoft Fabric 涵盖从数据移动到数据科学、实时分析、商业智能和报告的所有内容。 了解如何免费开始新的试用!
注意
工作流编排管理器由 Apache Airflow 提供支持。
数据工厂管道提供 100 多个数据源连接器,这些连接器提供可缩放且可靠的数据集成/数据流。 在某些情况下,你想要从 Apache Airflow DAG 运行现有的数据工厂管道。 本教程将展示如何做到这一点。
先决条件
- Azure 订阅。 如果还没有 Azure 订阅,可以在开始前创建一个免费 Azure 帐户。
- Azure 存储帐户。 如果没有存储帐户,请参阅创建 Azure 存储帐户以获取创建步骤。 确保存储帐户仅允许来自选定的网络的访问。
- Azure 数据工厂管道。 可以按照任何这些教程新建一个数据工厂管道(如果还没有),或者在开始试用第一个数据工厂管道中一键选择创建一个管道。
- 设置服务主体。 需要创建一个新的服务主体或使用现有服务主体,并授予其运行管道的权限(例如,现有管道所在的数据工厂中的参与者角色),即使工作流编排管理器环境和管道存在于同一数据工厂中也是如此。 需要获取服务主体的客户端 ID 和客户端密码(API 密钥)。
步骤
使用以下内容新建一个 Python 文件 adf.py:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
必须使用工作流编排管理器 UI“管理员”->“连接”->“+”-> 选择“连接类型”作为“Azure 数据工厂”创建连接,然后填写 client_id、client_secret、tenant_id、subscription_id、resource_group_name、data_factory_name 和 pipeline_name。
将 adf.py 文件上传到名为 DAGS 的文件夹中的 blob 存储。
将 DAGS 文件夹导入工作流编排管理器环境。 如果没有,请新建一个