Freigeben über


Ausführen einer vorhandenen Pipeline mit Workflow Orchestration Manager

GILT FÜR: Azure Data Factory Azure Synapse Analytics

Tipp

Testen Sie Data Factory in Microsoft Fabric, eine All-in-One-Analyselösung für Unternehmen. Microsoft Fabric deckt alle Aufgaben ab, von der Datenverschiebung bis hin zu Data Science, Echtzeitanalysen, Business Intelligence und Berichterstellung. Erfahren Sie, wie Sie kostenlos eine neue Testversion starten!

Hinweis

Workflow Orchestration Manager wird von Apache Airflow unterstützt.

Hinweis

Workflow Orchestration Manager für Azure Data Factory basiert auf der Open-Source-Anwendung Apache Airflow. Die Dokumentation und weitere Tutorials zu Airflow finden Sie in der Dokumentation zu Apache Airflow oder auf den Communityseiten.

Data Factory-Pipelines bieten mehr als 100 Datenquellenconnectors, die skalierbare und zuverlässige Datenintegrationen/Datenflüsse bieten. Es gibt Szenarien, in denen Sie eine vorhandene Data Factory-Pipeline über Ihren Apache Airflow-DAG ausführen möchten. Dieses Tutorial veranschaulicht die Vorgehensweise.

Voraussetzungen

  • Azure-Abonnement. Wenn Sie über kein Azure-Abonnement verfügen, können Sie ein kostenloses Azure-Konto erstellen, bevor Sie beginnen.
  • Azure-Speicherkonto. Wenn Sie kein Speicherkonto besitzen, finden Sie unter Informationen zu Azure-Speicherkonten Schritte zum Erstellen eines solchen Kontos. Stellen Sie sicher, dass das Speicherkonto nur den Zugriff über ausgewählte Netzwerke zulässt.
  • Azure Data Factory-Pipeline. Sie können einem der Tutorials folgen und eine neue Data Factory-Pipeline erstellen, falls Sie noch keine haben. Alternativ können Sie unter Erste Schritte mit Azure Data Factory mit nur einer Auswahl eine erstellen.
  • Einrichten eines Dienstprinzipals. Sie müssen einen neuen Dienstprinzipal erstellen oder einen vorhandenen verwenden und ihm die Berechtigung zum Ausführen der Pipeline erteilen (z. B. die Rolle „Mitwirkender“ in der Data Factory mit den bereits vorhandenen Pipelines), auch wenn sich die Workflow Orchestration Manager-Umgebung und die Pipelines in derselben Data Factory befinden. Sie müssen die Client-ID und den geheimen Clientschlüssel (API-Schlüssel) des Dienstprinzipals abrufen.

Schritte

  1. Erstellen Sie eine neue Python-Datei vom Typ adf.py mit dem folgenden Inhalt:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Sie müssen die Verbindung über die Benutzeroberfläche von Workflow Orchestration Manager: „Administrator“ > „Verbindungen“ > „+“ > Wählen Sie als „Verbindungstyp“ die Option „Azure Data Factory“ aus, und geben Sie dann Ihre Werte für client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name und pipeline_name ein.

  2. Laden Sie die Datei adf.py in Ihren Blobspeicher in einen Ordner namens DAGS hoch.

  3. Importieren Sie den Ordner DAGS in Ihre Workflow Orchestration Manager-Umgebung. Falls Sie noch keine besitzen, erstellen Sie eine neue

    Screenshot: Registerkarte für die Data Factory-Verwaltung, auf der der Abschnitt „Airflow“ ausgewählt ist