Поделиться через


Оркестрация заданий Apache Flink® с помощью диспетчера оркестрации рабочих процессов фабрики данных Azure (на основе Apache Airflow)

Важный

Azure HDInsight на AKS выведен из эксплуатации 31 января 2025 г. Узнайте больше из этого объявления.

Необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого завершения рабочих нагрузок.

Важный

Эта функция сейчас доступна в предварительной версии. Дополнительные условия использования для предварительных версий Microsoft Azure включают дополнительные юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за дополнительными обновлениями в Azure HDInsight Community.

В этой статье рассматривается управление задачей Flink с использованием REST API Azure и оркестрация конвейера данных с помощью диспетчера рабочих процессов в Azure Data Factory.  Менеджер оркестрации рабочих процессов Azure Data Factory — это простой и эффективный способ создания и управления средами Apache Airflow, позволяющий легко запускать конвейеры данных в большом масштабе.

Apache Airflow — это платформа с открытым исходным кодом, которая программно создает, планирует и отслеживает сложные рабочие процессы данных. Он позволяет определить набор задач, называемых операторами, которые можно объединить в ациклические графы (DAG) для представления конвейеров данных.

На следующей схеме показано размещение Airflow, Key Vault и HDInsight в AKS в Azure.

снимок экрана, показывающий размещение Airflow, хранилища ключей и HDInsight на AKS в Azure.

Несколько субъектов-служб Azure создаются в зависимости от области, чтобы ограничить доступ к ней и управлять жизненным циклом учетных данных клиента независимо.

Рекомендуется периодически менять ключи доступа или секреты.

Действия по настройке

  1. Настройка кластера Flink

  2. Загрузите jar-файл Flink Job в учетную запись хранения. Это может быть основная учетная запись хранения, связанная с кластером Flink, или любая другая учетная запись хранения, в которой необходимо назначить роль "Владелец данных BLOB-объектов хранилища" пользовательскому назначенному MSI, используемому для кластера.

  3. Azure Key Vault. Вы можете следовать этому руководству, чтобы создать новый Azure Key Vault, в случае если у вас его нет.

  4. Создайте учетную запись службы Microsoft Entra для доступа к Key Vault — предоставьте разрешение на доступ к Azure Key Vault с ролью "Сотрудник по секретам Key Vault" и сделайте заметку о 'appId', 'пароль' и 'tenant' из ответа. Мы должны использовать то же самое для Airflow, чтобы использовать хранилище Key Vault в качестве серверных элементов для хранения конфиденциальной информации.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Включите Azure Key Vault для диспетчера оркестрации рабочих процессов, чтобы хранить и управлять вашими конфиденциальными данными в безопасном и централизованном порядке. Для этого можно использовать переменные и подключения, которые автоматически сохраняются в Azure Key Vault. Имя подключений и переменных должно иметь префикс variables_prefix, который указан в AIRFLOW__SECRETS__BACKEND_KWARGS. Например, если variables_prefix имеет значение hdinsight-aks-variables, то для ключа переменной hello необходимо сохранить переменную в hdinsight-aks-variables-hello.

    • Добавьте следующие параметры для переопределения конфигурации Airflow в встроенных свойствах среды выполнения:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Добавьте следующий параметр для конфигурации переменных среды в свойствах интегрированной среды выполнения Airflow:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Добавить требования к Airflow — apache-airflow-providers-microsoft-azure

      снимок экрана: конфигурация воздушных потоков и переменные среды.

  6. Создайте учетную запись службы Microsoft Entra для доступа к Azure — предоставьте разрешение с ролью участника для доступа к кластеру HDInsight AKS, и запишите идентификатор приложения, пароль и арендатор из ответа.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Создайте следующие секреты в вашем хранилище ключей, используя значение из предыдущих appId, пароля и арендатора главного объекта службы AD, к которым добавлен префикс, заданный в переменной property variables_prefix, определенной в AIRFLOW__SECRETS__BACKEND_KWARGS. Код DAG может получить доступ к любой из этих переменных без variables_prefix.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-переменные-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

Определение DAG

DAG (направленный ациклический граф) — это основная концепция Airflow, объединение задач, организованное с зависимостями и связями, определяющая, как они должны выполняться.

Существует три способа задания DAG:

  1. Вы можете использовать диспетчер контекстов, который добавляет DAG в любое содержимое внутри него неявно.

  2. Вы можете использовать стандартный конструктор, передав DAG в любые операторы, которые вы используете.

  3. Вы можете использовать декоратор @dag, чтобы превратить функцию в генератор DAG (из airflow.decorators импортировать dag)

DAG не имеют значения без выполнения задач, которые предоставляются операторами, датчиками или TaskFlow.

Дополнительные сведения о графах, управляющих потоках, подграфах, группах задач и т. д. можно узнать непосредственно из Apache Airflow. 

Выполнение DAG

Пример кода доступен на Git; скачайте код локально на компьютере и отправьте wordcount.py в хранилище BLOB-объектов. Выполните действия , чтобы импортировать DAG в рабочий процесс, созданный во время установки.

Файл wordcount.py является примером оркестрации отправки задания Flink с использованием Apache Airflow и HDInsight на AKS. DAG имеет две задачи:

  • получение OAuth Token

  • Используйте REST API Azure для отправки задания в HDInsight Flink, чтобы выполнить новое задание.

Ожидается, что DAG будет настроен для учетной записи службы, как описано в процессе настройки учетных данных клиента OAuth, и передаст следующие входные данные конфигурации для выполнения.

Действия по выполнению

  1. Выполните DAG из пользовательского интерфейса Airflow, после чего вы можете открыть пользовательский интерфейс диспетчера оркестрации рабочих процессов фабрики данных Azure, нажав на значок "Монитор".

    Снимок экрана показывает, как открыть интерфейс диспетчера оркестрации рабочих процессов в Azure Data Factory, нажав на иконку монитора.

  2. Выберите DAG "FlinkWordCountExample" на странице "DAGs".

    снимок экрана, показывающий выбор примера подсчёта слов в Flink.

  3. Щелкните значок "Выполнить" в правом верхнем углу и выберите "Запустить DAG с конфигурацией".

    снимок экрана: значок выполнения.

  4. Передача необходимой конфигурации JSON

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscription":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Нажмите кнопку "Триггер", она запускает выполнение DAG.

  6. Вы можете визуализировать состояние задач DAG из запуска DAG

    снимок экрана: состояние задачи DAG.

  7. Проверка выполнения задания с портала

    снимок экрана: проверка выполнения задания.

  8. Проверка задания на панели мониторинга Apache Flink

    Снимок экрана показывает панель мониторинга Apache Flink.

Пример кода

Это пример организации потока данных с помощью Airflow с HDInsight на AKS.

DaG ожидает установки субъекта-службы для учетных данных клиента OAuth и передает следующую входную конфигурацию для выполнения:

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscription':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Ссылка