使用工作流程協調管理員執行現有的管線
適用於:Azure Data Factory Azure Synapse Analytics
提示
試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用!
注意
工作流程協調管理員是由 Apache Airflow 提供。
注意
Azure Data Factory 的工作流程協調管理員依賴開放原始碼 Apache Airflow 應用程式。 您可以在 Apache Airflow 文件或社群頁面上找到 Airflow 的文件和更多教學課程。
Data Factory 管線提供 100 個以上的資料來源連接器,這些連接器提供可調整且可靠的資料整合/資料流程。 在某些情況下,您會想要從 Apache Airflow DAG 執行現有的資料處理站管線。 本教學課程說明該如何執行此作業。
必要條件
- Azure 訂用帳戶。 如果您沒有 Azure 訂用帳戶,請在開始前建立免費 Azure 帳戶。
- Azure 儲存體帳戶。 如果您沒有儲存體帳戶,請參閱建立 Azure 儲存體帳戶,按照步驟建立此帳戶。 請確定儲存體帳戶只允許從選取的網路存取。
- Azure Data Factory 管線。 如果您還沒有資料處理站管線,則可以遵循任何教學課程並新建資料處理站管線,或遵循開始使用並試用第一個資料處理站管線中簡單的步驟以加以新建。
- 設定服務主體。 您必須建立新的服務主體或使用現有的服務主體,並授與它執行管線的許可權(例如 - 現有管線存在之數據處理站中的參與者角色),即使工作流程協調流程管理員環境和管線存在於相同的數據處理站也一樣。 您必須取得服務主體的用戶端識別碼和用戶端密碼 (API 金鑰)。
步驟
使用下列內容新建 Python 檔案 adf.py:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensor
您必須使用工作流程協調流程管理員 UI 管理員 - 連線 -> '+' -> 選擇 [聯機類型] 作為 [Azure Data Factory],然後填入您的client_id、client_secret、tenant_id、subscription_id、resource_group_name、data_factory_name和pipeline_name。>
將 adf.py 檔案上傳至名為 DAGS 資料夾內的 Blob 儲存體。
將 DAGS 資料夾匯入工作流程協調流程管理員環境。 如果您還沒有工作流程協調管理員環境,請加以新建