Поделиться через


Запуск существующего конвейера с помощью диспетчера оркестрации рабочих процессов

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Примечание.

Диспетчер оркестрации рабочих процессов управляется Apache Airflow.

Примечание.

Диспетчер оркестрации рабочих процессов для Фабрика данных Azure зависит от приложения Apache Airflow открытый код. Документация и дополнительные руководства по Airflow можно найти на страницах документации по Apache Airflow или сообщества.

Конвейеры фабрики данных предоставляют 100+ соединители источников данных, которые обеспечивают масштабируемую и надежную интеграцию данных/ потоки данных. Существуют сценарии, в которых вы хотите запустить существующий конвейер фабрики данных из DAG Apache Airflow. В этом руководстве показано, как это сделать.

Необходимые компоненты

  • Подписка Azure. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.
  • Учетная запись хранения Azure. Если у вас нет учетной записи хранения, создайте ее, следуя действиям в этом разделе. Убедитесь, что получить доступ к учетной записи хранения можно только из выбранных сетей.
  • конвейер Фабрика данных Azure. Вы можете следовать любому из учебников и создать новый конвейер фабрики данных в случае, если у вас еще нет одного, или создать его с одним нажатием кнопки "Начать работу" и попробовать первый конвейер фабрики данных.
  • Настройка субъекта-службы. Необходимо создать субъект-службу или использовать существующий и предоставить ему разрешение на запуск конвейера (например, роль участника в фабрике данных, где существуют существующие конвейеры), даже если среда Диспетчера рабочих процессов и конвейеры существуют в той же фабрике данных. Вам потребуется получить идентификатор клиента и секрет клиента субъекта-службы (ключ API).

Шаги

  1. Создайте новый файл Python adf.py со следующим содержимым:

    from datetime import datetime, timedelta
    
    from airflow.models import DAG, BaseOperator
    
    try:
        from airflow.operators.empty import EmptyOperator
    except ModuleNotFoundError:
        from airflow.operators.dummy import DummyOperator as EmptyOperator  # type: ignore
    from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator
    from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor
    from airflow.utils.edgemodifier import Label
    
    with DAG(
        dag_id="example_adf_run_pipeline",
        start_date=datetime(2022, 5, 14),
        schedule_interval="@daily",
        catchup=False,
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
            "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI
            "factory_name": "<FactoryName>",  # This can also be specified in the ADF connection.
            "resource_group_name": "<ResourceGroupName>",  # This can also be specified in the ADF connection.
        },
        default_view="graph",
    ) as dag:
        begin = EmptyOperator(task_id="begin")
        end = EmptyOperator(task_id="end")
    
        # [START howto_operator_adf_run_pipeline]
        run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline1",
            pipeline_name="<PipelineName>",
            parameters={"myParam": "value"},
        )
        # [END howto_operator_adf_run_pipeline]
    
        # [START howto_operator_adf_run_pipeline_async]
        run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator(
            task_id="run_pipeline2",
            pipeline_name="<PipelineName>",
            wait_for_termination=False,
        )
    
        pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor(
            task_id="pipeline_run_sensor",
            run_id=run_pipeline2.output["run_id"],
        )
        # [END howto_operator_adf_run_pipeline_async]
    
        begin >> Label("No async wait") >> run_pipeline1
        begin >> Label("Do async wait with sensor") >> run_pipeline2
        [run_pipeline1, pipeline_run_sensor] >> end
    
        # Task dependency created via `XComArgs`:
        #   run_pipeline2 >> pipeline_run_sensor
    

    Вам потребуется создать подключение с помощью администратора пользовательского интерфейса Диспетчера рабочих процессов — подключения -> "+" —> выберите "Тип подключения" как "Фабрика данных Azure", а затем заполните client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name и pipeline_name.>

  2. Отправьте файл adf.py в хранилище BLOB-объектов в папке с именем DAGS.

  3. Импортируйте папку DAGS в среду Диспетчера оркестрации рабочих процессов. Если у вас его нет, создайте новый.

    Снимок экрана: вкладка управления фабрикой данных с выбранным разделом Airflow.