次の方法で共有


Azure Data Factory Workflow Orchestration Manager を使用した Apache Flink® ジョブ オーケストレーション (Apache Airflow を利用)

Note

Azure HDInsight on AKS は 2025 年 1 月 31 日に廃止されます。 2025 年 1 月 31 日より前に、ワークロードを Microsoft Fabric または同等の Azure 製品に移行することで、ワークロードの突然の終了を回避する必要があります。 サブスクリプション上に残っているクラスターは停止され、ホストから削除されることになります。

提供終了日までは基本サポートのみが利用できます。

重要

現在、この機能はプレビュー段階にあります。 ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用されるその他の法律条項については、「Microsoft Azure プレビューの追加の使用条件」に記載されています。 この特定のプレビューについては、「Microsoft HDInsight on AKS のプレビュー情報」を参照してください。 質問や機能の提案については、詳細を記載した要求を AskHDInsight で送信してください。また、その他の更新情報については、Azure HDInsight コミュニティのフォローをお願いいたします。

この記事では、Azure REST API を使用した Flink ジョブの管理と、Azure Data Factory Workflow Orchestration Manager を使用したオーケストレーション データ パイプラインについて説明します。 Azure Data Factory Workflow Orchestration Manager サービスは、Apache Airflow 環境を作成および管理するためのシンプルで効率的な方法であり、データ パイプラインの簡単な大規模な実行を可能にします。

Apache Airflow は、複雑なデータ ワークフローをプログラムで作成、スケジュール、監視するオープンソース プラットフォームです。 これにより、オペレーターと呼ばれる一連のタスクを定義でき、有向非循環グラフ (DAG) と組み合わせてデータ パイプラインを表すことができます。

次の図は、Azure での Airflow、Key Vault、HDInsight on AKS の配置を示しています。

スクリーンショットは、Azure でのエアフロー、キー コンテナー、HDInsight on AKS の配置を示しています。

必要なアクセスを制限し、クライアント資格情報のライフ サイクルを個別に管理するために、スコープに基づいて複数の Azure サービス プリンシパルが作成されます。

アクセス キーまたはシークレットを定期的にローテーションすることをお勧めします。

設定手順

  1. Flink クラスターをセットアップします

  2. Flink Job jar をストレージ アカウントにアップロードします。 Flink クラスターに関連付けられているプライマリ ストレージ アカウントまたは他のストレージ アカウントを使用できます。このストレージ アカウントには、クラスターに使用されるユーザー割り当て MSI への “ストレージ BLOB データ所有者”ロールを割り当てる必要があります。

  3. Azure Key Vault - 所有していない場合は、このチュートリアルに従って新しい Azure Key Vault を作成できます。

  4. Key Vault にアクセスするための Microsoft Entra サービス プリンシパルを作成します。“Key Vault Secrets Officer” ロールを使用して Azure Key Vault にアクセスするためのアクセス許可を付与し、応答から ‘appId’‘password’、‘tenant’ を書き留めます。 機密情報を格納するためのバックエンドとして Key Vault ストレージを使用するには、Airflow で同じものを使用する必要があります。

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Workflow Orchestration Manager 用の Azure Key Vault を有効にして、機密情報を安全かつ一元的な方法で格納および管理します。 これを行うことで、変数と接続を使用することができ、それらは自動的に Azure Key Vault に格納されます。 接続と変数の名前には、AIRFLOW__SECRETS__BACKEND_KWARGS で定義した variables_prefix のプレフィックスを付ける必要があります。 たとえば、variables_prefixの値が hdinsight-aks-variables の場合、hello の変数キーに対して、変数を hdinsight-aks-variable -hello に格納する必要がありますす。

    • 統合ランタイム プロパティの [Airflow 構成のオーバーライド] に以下の設定を追加します。

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Airflow 統合ランタイム プロパティの環境変数の構成に以下の設定を追加します。

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Airflow 要件を追加する - apache-airflow-providers-microsoft-azure

      エアフロー構成と環境変数を示すスクリーンショット。

  6. Azure にアクセスするための Microsoft Entra サービス プリンシパルを作成します。“共同作成者” ロールを使用して HDInsight AKS クラスターにアクセスするためのアクセス許可を付与し、応答から appId、password、tenant を書き留めます。

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. 前の AD サービス プリンシパルの appId、password、tenent からの値を使用してキー コンテナーに次のシークレットを作成し、AIRFLOW__SECRETS__BACKEND_KWARGS で定義されているプロパティ variables_prefix のプレフィックスを付けます。 DAG コードは、variables_prefix なしでこれらすべての変数にアクセスできます。

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

DAG の定義

DAG (有向非循環グラフ) は Airflow の核となる概念であり、タスクをまとめて収集し、依存関係とリレーションシップを使用して編成し、それらの実行方法を示します。

DAG を宣言するには、次の 3 つの方法があります。

  1. コンテキスト マネージャーを使用して、その中のものに DAG を暗黙的に追加することができます

  2. 標準コンストラクターを使用して、使用する任意のオペレーターに DAG を渡すことができます

  3. @dag デコレーターを使用して、関数を DAG ジェネレーターに変換できます (from airflow.decorators import dag)

DAG は実行するタスクがなければ存在できず、Operator、Sensor、または TaskFlow のいずれかの形式で提供されます。

DAG、制御フロー、SubDAG、TaskGroup などの詳細については、Apache Airflowを直接参照してください。 

DAG の実行

サンプル コードは git で入手できます。コードをコンピューター上でローカルにダウンロードし、wordcount.py を BLOB ストレージにアップロードします。 手順に従って、セットアップ中に作成したワークフローに DAG をインポートします。

wordcount.py は、HDInsight on AKS で Apache Airflow を使用して Flink ジョブの送信を調整する例です。 DAG には、次の 2 つのタスクがあります。

  • get OAuth Token

  • HDInsight Flink ジョブ送信 Azure REST API を呼び出して新しいジョブを送信する

DAG では、OAuth クライアント資格情報のセットアップ プロセス中に説明されているように、サービス プリンシパルのセットアップを行い、実行のために次の入力構成を渡す必要があります。

実行ステップ

  1. Airflow UI から DAG を実行します。モニター アイコンをクリックして Azure Data Factory Workflow Orchestration Manager UI を開くことができます。

    モニター アイコンをクリックして Azure Data Factory Workflow Orchestration Manager UI を開く、を示すスクリーンショット。

  2. [DAG] ページから "FlinkWordCountExample" の DAG を選択します。

    スクリーンショットは、Flink ワード カウントの選択の例を示しています。

  3. 右上隅にある "実行" アイコンをクリックし、[Trigger DAG w/ config]\(構成ありで DAG をトリガー\) を選択します。

    スクリーンショットは、実行の選択アイコンを示しています。

  4. 必須の構成 JSON を渡します

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscription":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. "トリガー" ボタンをクリックすると、DAG の実行が開始されます。

  6. DAG 実行から DAG タスクの状態を視覚化できます

    dag タスクの状態を示すスクリーンショット。

  7. ポータルからジョブ実行を検証します

    ジョブ実行の検証を示すスクリーンショット。

  8. "Apache Flink ダッシュボード" からジョブを検証します

    Apache Flink ダッシュボードを示すスクリーンショット。

コード例

これは、HDInsight on AKS で Airflow を使用してデータ パイプラインを調整する例です。

DAG では、OAuth クライアント資格情報のサービス プリンシパルのセットアップを行い、実行のために次の入力構成を渡す必要があります。

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscription':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

リファレンス