Azure Data Factory ワークフロー オーケストレーション マネージャーを使用した Apache Flink® ジョブ オーケストレーション (Apache エアフローを利用)
大事な
AKS 上の Azure HDInsight は、2025 年 1 月 31 日に廃止されました。 この発表 を通じて、について詳しく知ってください。
ワークロードの突然の終了を回避するには、ワークロードを Microsoft Fabric または同等の Azure 製品 に移行する必要があります。
大事な
この機能は現在プレビュー段階です。 Microsoft Azure プレビューの 追加使用条件 には、ベータ版、プレビュー版、または一般公開されていない Azure 機能に適用される、より多くの法的条件が含まれています。 この特定のプレビューの詳細については、AKS プレビュー情報 Azure HDInsightを参照してください。 ご質問や機能の提案がある場合は、詳細を記載した要求をAskHDInsight に送信してください。また、最新情報を得られるよう、Azure HDInsight Community をフォローしてください。
この記事では、Azure Data Factory ワークフロー オーケストレーション マネージャーを使用した Azure REST API とオーケストレーション データ パイプライン 使用した Flink ジョブの管理について説明します。 Azure Data Factory Workflow Orchestration Manager サービスは、Apache Flow 環境 作成および管理するためのシンプルで効率的な方法であり、大規模なデータ パイプラインを簡単に実行できます。
Apache エアフローは、プログラムによって複雑なデータ ワークフローを作成、スケジュール、監視するオープンソース プラットフォームです。 これにより、データ パイプラインを表すために有向非循環グラフ (DAG) に組み合わせることができる演算子と呼ばれる一連のタスクを定義できます。
次の図は、Azure の AKS でのエアフロー、Key Vault、HDInsight の配置を示しています。
必要なアクセスを制限し、クライアント資格情報のライフ サイクルを個別に管理するために、スコープに基づいて複数の Azure サービス プリンシパルが作成されます。
アクセス キーまたはシークレットを定期的にローテーションすることをお勧めします。
セットアップ手順
Flink Job jar をストレージ アカウントにアップロードします。 Flink クラスターまたは他のストレージ アカウントに関連付けられているプライマリ ストレージ アカウントを使用できます。このストレージ アカウントでは、クラスターに使用されるユーザー割り当て MSI に "ストレージ BLOB データ所有者" ロールを割り当てる必要があります。
Azure Key Vault - このチュートリアル 従って、新しい Azure Key Vault を作成できます (ない場合)。
Key Vault アクセスするための Microsoft Entra サービス プリンシパル を作成する – "Key Vault Secrets Officer" ロールを持つ Azure Key Vault にアクセスするためのアクセス許可を付与し、応答から 'appId' 'password' と 'tenant' を書き留めます。 機密情報を格納するためのバックエンドとして Key Vault ストレージを使用するには、エアフローでも同じものを使用する必要があります。
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
ワークフロー オーケストレーション マネージャー Azure Key Vault を有効にして、セキュリティで保護された一元化された方法で機密情報を格納および管理します。 これにより、変数と接続を使用でき、Azure Key Vault に自動的に格納されます。 接続と変数の名前には、AIRFLOW__SECRETS__BACKEND_KWARGSで定義されたvariables_prefixをプレフィックスとして付ける必要があります。 たとえば、variables_prefixの値が hdinsight-aks-variables の場合、hello の変数キーに対して、変数を hdinsight-aks-variable -hello に格納します。
統合ランタイム プロパティで、エアフロー構成のオーバーライドに次の設定を追加します。
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
エアフロー統合ランタイム プロパティに、環境変数の構成に次の設定を追加します。
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
エアフローの要件を追加する - apache-airflow-providers-microsoft-azure
Azure アクセスするための Microsoft Entra サービス プリンシパル を作成する – 共同作成者ロールを持つ HDInsight AKS クラスターにアクセスするためのアクセス許可を付与し、応答から appId、パスワード、テナントを書き留めます。
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
キーコンテナーに、前のADサービスプリンシパルから取得したappId、パスワード、およびテナントの値に、AIRFLOW__SECRETS__BACKEND_KWARGSで定義されたプロパティのプレフィックスvariables_prefixを付けた上で次のシークレットを作成します。 DAG コードは、variables_prefixなしでこれらの変数にアクセスできます。
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
DAG 定義
DAG (有向非循環グラフ) は、エアフローの核となる概念であり、タスクをまとめて収集し、依存関係とリレーションシップを使用して、それらの実行方法を示します。
DAG を宣言するには、次の 3 つの方法があります。
コンテキスト マネージャーを使用できます。このマネージャーは、その中の任意のオブジェクトに DAG を暗黙的に追加します。
標準コンストラクターを使用して、使用する任意の演算子に DAG を渡すことができます
@dag デコレータを使用して、関数を DAG ジェネレーターに変換できます (エアフローデコレータから dag をインポートします)。
DAG は実行するタスクなしでは何もなく、演算子、センサー、または TaskFlow のいずれかの形式で提供されます。
DAG、制御フロー、サブDAG、タスクグループなどの詳細については、Apache Airflowから直接読むことができます。
DAG の実行
サンプル コードは、gitで入手できます。コードをコンピューター上でローカルにダウンロードし、wordcount.py を BLOB ストレージにアップロードします。 手順に従って、セットアップ中に作成したワークフローに DAG をインポートします。
wordcount.py は、AKS 上の HDInsight で Apache エアフローを使用して Flink ジョブの送信を調整する例です。 DAG には、次の 2 つのタスクがあります。
OAuth Token
を取得するHDInsight Flink Job Submission Azure REST API を呼び出して新しいジョブを送信する
DAG は、OAuth クライアント資格情報のセットアップ プロセス中に説明されているように、サービス プリンシパルのセットアップを行い、実行のために次の入力構成を渡す必要があります。
実行手順
エアフロー UIから DAG を実行します。[モニター] アイコンをクリックして Azure Data Factory ワークフロー オーケストレーション マネージャー UI を開くことができます。
[DAG] ページから "FlinkWordCountExample" DAG を選択します。
右上隅にある [実行] アイコンをクリックし、[構成で DAG をトリガーする] を選択します。
必要な構成 JSON を渡す
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
[トリガー] ボタンをクリックすると、DAG の実行が開始されます。
DAG 実行から DAG タスクの状態を視覚化できます
ポータルからジョブの実行を検証する
"Apache Flink ダッシュボード" からジョブを検証する
コード例
これは、AKS 上の HDInsight でエアフローを使用してデータ パイプラインを調整する例です。
DAG は、OAuth クライアント資格情報のサービス プリンシパルのセットアップを行い、実行のために次の入力構成を渡す必要があります。
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
参考
- サンプル コードを参照してください。
- Apache Flink Web サイト
- Apache、Apache エアフロー、エアフロー、Apache Flink、Flink、および関連するオープン ソース プロジェクト名 は、Apache Software Foundation (ASF) の 商標です。