Оркестрация заданий Apache Flink® с помощью диспетчера оркестрации рабочих процессов фабрики данных Azure (на основе Apache Airflow)
Важный
Azure HDInsight на AKS выведен из эксплуатации 31 января 2025 г. Узнайте больше из этого объявления.
Необходимо перенести рабочие нагрузки в Microsoft Fabric или эквивалентный продукт Azure, чтобы избежать резкого завершения рабочих нагрузок.
Важный
Эта функция сейчас доступна в предварительной версии. Дополнительные условия использования для предварительных версий Microsoft Azure включают дополнительные юридические термины, применимые к функциям Azure, которые находятся в бета-версии, в предварительной версии или в противном случае еще не выпущены в общую доступность. Сведения об этой конкретной предварительной версии см. в Azure HDInsight в предварительной версии AKS. Для вопросов или предложений функций отправьте запрос на AskHDInsight с подробными сведениями и следуйте за дополнительными обновлениями в Azure HDInsight Community.
В этой статье рассматривается управление задачей Flink с использованием REST API Azure и оркестрация конвейера данных с помощью диспетчера рабочих процессов в Azure Data Factory. Менеджер оркестрации рабочих процессов Azure Data Factory — это простой и эффективный способ создания и управления средами Apache Airflow, позволяющий легко запускать конвейеры данных в большом масштабе.
Apache Airflow — это платформа с открытым исходным кодом, которая программно создает, планирует и отслеживает сложные рабочие процессы данных. Он позволяет определить набор задач, называемых операторами, которые можно объединить в ациклические графы (DAG) для представления конвейеров данных.
На следующей схеме показано размещение Airflow, Key Vault и HDInsight в AKS в Azure.
Несколько субъектов-служб Azure создаются в зависимости от области, чтобы ограничить доступ к ней и управлять жизненным циклом учетных данных клиента независимо.
Рекомендуется периодически менять ключи доступа или секреты.
Действия по настройке
Загрузите jar-файл Flink Job в учетную запись хранения. Это может быть основная учетная запись хранения, связанная с кластером Flink, или любая другая учетная запись хранения, в которой необходимо назначить роль "Владелец данных BLOB-объектов хранилища" пользовательскому назначенному MSI, используемому для кластера.
Azure Key Vault. Вы можете следовать этому руководству, чтобы создать новый Azure Key Vault, в случае если у вас его нет.
Создайте учетную запись службы Microsoft Entra для доступа к Key Vault — предоставьте разрешение на доступ к Azure Key Vault с ролью "Сотрудник по секретам Key Vault" и сделайте заметку о 'appId', 'пароль' и 'tenant' из ответа. Мы должны использовать то же самое для Airflow, чтобы использовать хранилище Key Vault в качестве серверных элементов для хранения конфиденциальной информации.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Включите Azure Key Vault для диспетчера оркестрации рабочих процессов, чтобы хранить и управлять вашими конфиденциальными данными в безопасном и централизованном порядке. Для этого можно использовать переменные и подключения, которые автоматически сохраняются в Azure Key Vault. Имя подключений и переменных должно иметь префикс variables_prefix, который указан в AIRFLOW__SECRETS__BACKEND_KWARGS. Например, если variables_prefix имеет значение hdinsight-aks-variables, то для ключа переменной hello необходимо сохранить переменную в hdinsight-aks-variables-hello.
Добавьте следующие параметры для переопределения конфигурации Airflow в встроенных свойствах среды выполнения:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Добавьте следующий параметр для конфигурации переменных среды в свойствах интегрированной среды выполнения Airflow:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Добавить требования к Airflow — apache-airflow-providers-microsoft-azure
Создайте учетную запись службы Microsoft Entra для доступа к Azure — предоставьте разрешение с ролью участника для доступа к кластеру HDInsight AKS, и запишите идентификатор приложения, пароль и арендатор из ответа.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Создайте следующие секреты в вашем хранилище ключей, используя значение из предыдущих appId, пароля и арендатора главного объекта службы AD, к которым добавлен префикс, заданный в переменной property variables_prefix, определенной в AIRFLOW__SECRETS__BACKEND_KWARGS. Код DAG может получить доступ к любой из этих переменных без variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-переменные-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
Определение DAG
DAG (направленный ациклический граф) — это основная концепция Airflow, объединение задач, организованное с зависимостями и связями, определяющая, как они должны выполняться.
Существует три способа задания DAG:
Вы можете использовать диспетчер контекстов, который добавляет DAG в любое содержимое внутри него неявно.
Вы можете использовать стандартный конструктор, передав DAG в любые операторы, которые вы используете.
Вы можете использовать декоратор @dag, чтобы превратить функцию в генератор DAG (из airflow.decorators импортировать dag)
DAG не имеют значения без выполнения задач, которые предоставляются операторами, датчиками или TaskFlow.
Дополнительные сведения о графах, управляющих потоках, подграфах, группах задач и т. д. можно узнать непосредственно из Apache Airflow.
Выполнение DAG
Пример кода доступен на Git; скачайте код локально на компьютере и отправьте wordcount.py в хранилище BLOB-объектов. Выполните действия , чтобы импортировать DAG в рабочий процесс, созданный во время установки.
Файл wordcount.py является примером оркестрации отправки задания Flink с использованием Apache Airflow и HDInsight на AKS. DAG имеет две задачи:
получение
OAuth Token
Используйте REST API Azure для отправки задания в HDInsight Flink, чтобы выполнить новое задание.
Ожидается, что DAG будет настроен для учетной записи службы, как описано в процессе настройки учетных данных клиента OAuth, и передаст следующие входные данные конфигурации для выполнения.
Действия по выполнению
Выполните DAG из пользовательского интерфейса Airflow, после чего вы можете открыть пользовательский интерфейс диспетчера оркестрации рабочих процессов фабрики данных Azure, нажав на значок "Монитор".
Выберите DAG "FlinkWordCountExample" на странице "DAGs".
Щелкните значок "Выполнить" в правом верхнем углу и выберите "Запустить DAG с конфигурацией".
Передача необходимой конфигурации JSON
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Нажмите кнопку "Триггер", она запускает выполнение DAG.
Вы можете визуализировать состояние задач DAG из запуска DAG
Проверка выполнения задания с портала
Проверка задания на панели мониторинга Apache Flink
Пример кода
Это пример организации потока данных с помощью Airflow с HDInsight на AKS.
DaG ожидает установки субъекта-службы для учетных данных клиента OAuth и передает следующую входную конфигурацию для выполнения:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Ссылка
- См. пример кода .
- веб-сайт Apache Flink
- Apache, Apache Airflow, Airflow, Apache Flink, Flink и связанные имена проектов с открытым исходным кодом являются товарных знаков Apache Software Foundation (ASF).