通过 Azure 数据工厂工作流编排管理器(由 Apache Airflow 提供支持)对 Apache Flink® 作业进行编排
重要
AKS 上的 Azure HDInsight 已于 2025 年 1 月 31 日停用。 通过此公告 了解更多信息。
需要将工作负荷迁移到 Microsoft Fabric 或等效的 Azure 产品,以避免工作负荷突然终止。
重要
此功能目前以预览版提供。 Microsoft Azure 预览计划的补充使用条款 包括适用于 beta 版、预览版或尚未正式发布的 Azure 功能的更多法律条款。 有关此特定预览的信息,请参阅 在 AKS 上的 Azure HDInsight 预览信息。 有关问题或功能建议,请在 AskHDInsight 上提交请求,并提供详细信息。关注我们以获取有关 Azure HDInsight 社区 的更多更新。
本文介绍如何使用 Azure REST API 管理 Flink 作业,并通过 Azure 数据工厂工作流编排管理器管理业务流程数据管道。 Azure 数据工厂工作流业务流程管理器 服务是创建和管理 Apache Airflow 环境的简单高效方法,使你能够轻松地大规模运行数据管道。
Apache Airflow 是一个开源平台,以编程方式创建、计划和监视复杂的数据工作流。 它允许你定义一组任务,称为运算符,这些运算符可以组合成有向无环图(DAG)来表示数据管道。
下图显示了在 Azure 的 AKS 上部署 Airflow、Key Vault 和 HDInsight。
基于范围创建多个 Azure 服务主体,以限制其所需的访问,并独立管理客户端凭据生命周期。
建议定期轮换访问密钥或机密。
设置步骤
将 Flink 作业 jar 上传到存储帐户。 它可以是与 Flink 群集或任何其他存储帐户关联的主存储帐户,在此存储帐户中应将“存储 Blob 数据所有者”角色分配给用于此存储帐户中的群集的用户分配的 MSI。
Azure Key Vault - 可以按照本教程 创建新的 Azure Key Vault(如果没有)。
创建 Microsoft Entra 服务主体 以访问 Key Vault —— 授予该主体“Key Vault 密钥官”角色,允许访问 Azure Key Vault,并记下响应中的“appId”“密码”和“租户”。 为了让 Airflow 将 Key Vault 存储用作存储敏感信息的后端,我们需要使用相同的方法。
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
为工作流业务流程管理器 启用 Azure Key Vault,以安全集中的方式存储和管理敏感信息。 通过执行此作,可以使用变量和连接,这些变量和连接会自动存储在 Azure Key Vault 中。 连接和变量的名称需要以AIRFLOW__SECRETS__BACKEND_KWARGS中定义的variables_prefix为前缀。 例如,如果variables_prefix将值作为 hdinsight-aks-variables,则对于 hello 的变量键,需要将变量存储在 hdinsight-aks-variable -hello 中。
在集成运行时属性中,为 Airflow 配置重写添加以下设置:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
为 Airflow 集成运行时属性中的环境变量配置添加以下设置:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
添加 Airflow 依赖项 - apache-airflow-providers-microsoft-azure
创建 Microsoft Entra 服务主体 访问 Azure – 授予访问具有参与者角色的 HDInsight AKS 群集的权限,并记下响应中的 appId、密码和租户。
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
在您的密钥保管库中创建以下机密:这些机密的值来自于之前的 AD 服务主体的 appId、密码和租户,其前缀由 AIRFLOW__SECRETS__BACKEND_KWARGS 中定义的属性变量前缀 variables_prefix。 DAG 代码无需variables_prefix即可访问其中任何变量。
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
DAG 定义
DAG(定向无环图)是 Airflow 的核心概念,它将任务集合起来,通过组织依赖关系和逻辑顺序,明确任务的执行方式。
有三种方法可以声明 DAG:
可以使用上下文管理器,它会将 DAG 隐式添加到其内部的所有内容中。
可以使用标准构造函数,将 DAG 传递到你使用的任何运算符
可以使用 @dag 修饰器将函数转换为 DAG 生成器(从 airflow.修饰器导入 dag)
没有任务可运行,DAG 就毫无意义,这些任务以操作符、传感器或 TaskFlow 的形式出现。
可以直接从 Apache Airflow阅读有关 DAG、控制流、SubDAG、TaskGroups 等的更多详细信息。
DAG 执行
git上提供了示例代码;在计算机上本地下载代码,并将 wordcount.py 上传到 Blob 存储。 按照 步骤 将 DAG 导入到安装过程中创建的工作流中。
wordcount.py 是一个使用 Apache Airflow 在 AKS 上结合 HDInsight 来管理 Flink 作业提交的示例。 DAG 有两个任务:
获取
OAuth Token
调用 HDInsight Flink 作业提交 Azure REST API 以提交新作业
DAG 需要为服务主体设置,如 OAuth 客户端凭据的安装过程中所述,并传递以下输入配置来执行。
执行步骤
在 Airflow 用户界面中执行 DAG,你可以通过点击“监视”图标打开 Azure 数据工厂的工作流编排管理界面。
从“DAGs”页中选择“FlinkWordCountExample”DAG。
单击右上角的“执行”图标,然后选择“使用配置触发 DAG”。
传递所需的配置 JSON文件
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
单击“触发器”按钮,启动 DAG 的执行。
您可以在 DAG 运行中可视化 DAG 任务的状态。
从门户中验证作业的执行
在“Apache Flink 仪表板”中验证作业
示例代码
这是在 AKS 上使用 Airflow 与 HDInsight 协调数据管道的一个示例。
DAG 预计将为 OAuth 客户端凭据设置服务主体,并传递以下用于执行的输入配置:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
参考
- 请参阅 示例代码。
- Apache Flink 网站
- Apache、Apache Airflow、Airflow、Apache Flink、Flink 和关联的开源项目名称 是 Apache Software Foundation(ASF) 商标。