使用 Azure Data Factory 工作流程協調管理員(由 Apache Airflow 驅動)的 Apache Flink® 作業協調
重要
AKS 上的 Azure HDInsight 於 2025 年 1 月 31 日淘汰。 透過此公告 深入瞭解。
您必須將工作負載移轉至 Microsoft Fabric 或對等 Azure 產品,以避免突然終止工作負載。
重要
這項功能目前為預覽狀態。 Microsoft Azure 預覽版的補充使用規定 包含適用於 Beta 版、預覽版或尚未正式發行之 Azure 功能的更合法條款。 如需此特定預覽的相關資訊,請參閱 Azure HDInsight 在 AKS 預覽資訊。 如有問題或功能建議,請在 AskHDInsight 提交請求,並關注我們以獲取 Azure HDInsight 社群的更多更新。
本文涵蓋使用 Azure REST API 並透過 Azure Data Factory 工作流程協調管理器來管理 Flink 作業。 Azure Data Factory 工作流程協調流程管理員 服務是建立和管理 Apache Airflow 環境的簡單且有效率的方式,可讓您輕鬆地大規模執行數據管線。
Apache Airflow 是開放原始碼平臺,以程式設計方式建立、排程及監視複雜的數據工作流程。 它可讓您定義一組稱為運算子的工作,這些運算符可以合併成有向無循環圖表(DAG)來代表數據管線。
下圖顯示 Azure 中 AKS 上的 Airflow、Key Vault 和 HDInsight 位置。
系統會根據範圍來建立多個 Azure 服務主體,以限制其所需的存取權,以及獨立管理客戶端認證生命週期。
建議定期輪替存取密鑰或秘密。
安裝步驟
將您的 Flink 任務 jar 上傳至儲存帳戶。 它可以是與 Flink 叢集或任何其他記憶體帳戶相關聯的主要記憶體帳戶,您應該在此記憶體帳戶中將「記憶體 Blob 數據擁有者」角色指派給用於此儲存器帳戶中叢集的使用者指派 MSI。
Azure Key Vault - 如果您沒有 Azure Key Vault,您可以遵循本教學課程 建立新的 Azure Key Vault。
建立 Microsoft Entra 服務主體 存取 Key Vault – 授與許可權以使用「Key Vault 秘密人員」角色存取 Azure Key Vault,並記下回應中的 'appId' 'password' 和 'tenant'。 我們需要使用相同的方法,讓 Airflow 使用 Key Vault 儲存庫作為儲存敏感資訊的後端。
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
為工作流程協調流程管理員 啟用 Azure Key Vault,以安全且集中的方式儲存和管理您的敏感性資訊。 如此一來,您就可以使用變數和連線,並自動儲存在 Azure Key Vault 中。 連接和變數的名稱需要加上在AIRFLOW__SECRETS__BACKEND_KWARGS中定義的variables_prefix作為前綴。 例如,如果variables_prefix具有 hdinsight-aks-variables 的值,則針對 hello 的變數索引鍵,您會想要將變數儲存在 hdinsight-aks-variable -hello。
在整合式執行時間屬性中,新增下列 Airflow 組態覆蓋設定:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
在 Airflow 整合執行時間屬性中新增環境變數組態的下列設定:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
新增 Airflow 需求 - apache-airflow-providers-microsoft-azure
建立 Microsoft Entra 服務主體 以存取 Azure - 授予許可權以存取具有貢獻者角色的 HDInsight AKS 叢集,並記下回應中的 appId、密碼和租戶。
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
在金鑰保存庫中建立下列秘密,含有先前 AD 服務主體的 appId、密碼和租戶的值,並在前面加上由 AIRFLOW__SECRETS__BACKEND_KWARGS 定義的屬性 variables_prefix。 DAG 程式代碼可以存取任何這些變數,而不需要variables_prefix。
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
DAG 定義
DAG(有向無循環圖)是 Airflow 的核心概念,將任務一起組織起來,並透過依賴和關聯說明其應該如何運行。
宣告 DAG 的三種方式有三種:
您可以使用上下文管理器,這會以隱含方式將 DAG 新增到其範圍內的所有內容。
您可以使用標準建構函式,將 DAG 傳遞至您使用的任何運算元
您可以使用 @dag 裝飾器,將函式轉換成 DAG 生成器(從 airflow.decorators 匯入 dag)
DAG 沒有任務是無法運作的,而這些任務會以操作器、感測器或 TaskFlow 的形式出現。
您可以直接從apache Airflow ,閱讀有關DAG、控制流程、SubDAG、TaskGroups等的詳細數據。
DAG 執行
git提供範例程序代碼;在本機計算機上下載程序代碼,並將 wordcount.py 上傳至 Blob 記憶體。 請遵循 步驟,將 DAG 匯入到您在安裝期間建立的工作流程。
wordcount.py 是使用 Apache Airflow 搭配 AKS 上的 HDInsight 協調 Flink 作業提交的範例。 DAG 有兩項工作:
取得
OAuth Token
呼叫 HDInsight Flink 作業提交的 Azure REST API 以提交新作業
DAG 預期需要設定服務主體,如在 OAuth 客戶端憑證的設置過程中所述,並需傳遞以下輸入配置以便執行。
執行步驟
從 Airflow UI執行 DAG,您可以按一下 [監視] 圖示以開啟 Azure Data Factory 工作流程協調系統管理員 UI。
在「DAGs」頁面中選擇「FlinkWordCountExample」DAG。
按兩下右上角的 [執行] 圖示,然後選取 [觸發DAG w/ config]。
傳遞必要的設定 JSON
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
按兩下 [觸發程式] 按鈕,就會開始執行DAG。
您可以從 DAG 執行將 DAG 工作的狀態可視化
從入口網站驗證作業執行狀況
從「Apache Flink 儀錶板」驗證工作
範例程序代碼
這是在 AKS 上使用 Airflow 搭配 HDInsight 協調數據管線的範例。
DAG 預期將為 OAuth 用戶端憑證設定服務主體,並於執行時傳遞以下輸入組態:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
參考
- 請參閱 範例程式代碼。
- Apache Flink 網站
- Apache、Apache Airflow、Airflow、Apache Flink、Flink 和相關聯的開放原始碼專案名稱是 Apache Software Foundation(ASF)的 商標。