Partager via


Orchestration de travaux Apache Flink® à l’aide du Gestionnaire d’orchestration de flux de travail Azure Data Factory (optimisé par Apache Airflow)

Important

Azure HDInsight sur AKS a été mis hors service le 31 janvier 2025. Pour en savoir plus , consultez cette annonce.

Vous devez migrer vos charges de travail vers Microsoft Fabric ou un produit Azure équivalent pour éviter l’arrêt brusque de vos charges de travail.

Important

Cette fonctionnalité est actuellement en préversion. Les Conditions d’utilisation supplémentaires pour les préversions Microsoft Azure incluent des termes juridiques supplémentaires qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou qui ne sont pas encore publiées en disponibilité générale. Pour plus d'informations sur cette préversion spécifique, consultez les informations de préversion concernant Azure HDInsight sur AKS . Pour des questions ou des suggestions de fonctionnalités, envoyez une demande sur AskHDInsight avec les détails et suivez-nous pour plus de mises à jour sur Communauté Azure HDInsight.

Cet article traite de la gestion d’une tâche Flink à l’aide de API REST Azure et du pipeline d'orchestration de données avec le Gestionnaire d'Orchestration de Flux de Travail Azure Data Factory.  Azure Data Factory Workflow Orchestration Manager service est un moyen simple et efficace de créer et de gérer environnements Apache Airflow, ce qui vous permet d’exécuter facilement des pipelines de données à grande échelle.

Apache Airflow est une plateforme open source qui crée, planifie et surveille des flux de travail de données complexes par programmation. Il vous permet de définir un ensemble de tâches, appelées opérateurs qui peuvent être combinés dans des graphiques acycliques dirigés (DAGs) pour représenter des pipelines de données.

Le diagramme suivant montre le positionnement d’Airflow, Key Vault et HDInsight sur AKS dans Azure.

Capture d’écran montrant l’emplacement d’Airflow, du coffre de clés et de HDInsight sur AKS dans Azure.

Plusieurs principaux de service Azure sont créés en fonction de l’étendue pour limiter l’accès dont il a besoin et pour gérer le cycle de vie des informations d’identification du client indépendamment.

Il est recommandé de faire pivoter régulièrement les clés d’accès ou les secrets.

Étapes de configuration

  1. Configurer le cluster Flink

  2. Chargez votre fichier jar de tâche Flink dans le compte de stockage. Il peut s’agir du compte de stockage principal associé au cluster Flink ou à tout autre compte de stockage, où vous devez attribuer le rôle « Propriétaire des données blob de stockage » au MSI affecté par l’utilisateur utilisé pour le cluster dans ce compte de stockage.

  3. Azure Key Vault : vous pouvez suivre ce didacticiel pour créer un nouveau Azure Key Vault au cas où vous n'en auriez pas.

  4. Créez principal du service Microsoft Entra pour accéder à Key Vault : accordez l’autorisation d’accéder à Azure Key Vault avec le rôle « Agent des secrets Key Vault », puis notez « appId » « mot de passe » et « locataire » à partir de la réponse. Nous devons utiliser la même option pour airflow afin d’utiliser le stockage Key Vault comme back-ends pour stocker des informations sensibles.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Activez Azure Key Vault pour le Workflow Orchestration Manager pour stocker et gérer vos informations sensibles de manière sécurisée et centralisée. En procédant ainsi, vous pouvez utiliser des variables et des connexions, et elles sont automatiquement stockées dans Azure Key Vault. Les noms des connexions et des variables doivent être préfixés par variables_prefix défini dans AIRFLOW__SECRETS__BACKEND_KWARGS. Par exemple, si variables_prefix a une valeur comme hdinsight-aks-variables, pour une clé de variable hello, vous souhaitez stocker votre variable sur hdinsight-aks-variable -hello.

    • Ajoutez les paramètres suivants pour les « overrides » de configuration d'Airflow dans les propriétés de « runtime » intégrées :

      • AIRFLOW__SECRETS__BACKEND : "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS :
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Ajoutez le paramètre suivant pour la configuration des variables d’environnement dans les propriétés du runtime intégré Airflow :

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Ajouter des exigences pour Airflow - apache-airflow-providers-microsoft-azure

      Capture d’écran montrant la configuration du flux d’air et les variables d’environnement.

  6. Créez principal de service Microsoft Entra pour accéder à Azure : accordez l’autorisation d’accéder au cluster AKS HDInsight avec le rôle Contributeur, notez appId, mot de passe et locataire à partir de la réponse.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Créez les secrets suivants dans votre coffre de clés avec les valeurs de l'appId, du mot de passe et du tenant du précédent principal de service AD, préfixés par la propriété "variables_prefix" définie dans AIRFLOW__SECRETS__BACKEND_KWARGS. Le code DAG peut accéder à l’une de ces variables sans variables_prefix.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

Définition DAG

Un DAG (graphe acyclique dirigé) est le concept principal d'Airflow : il regroupe des tâches, organisées avec des dépendances et des relations, afin de déterminer comment elles doivent s'exécuter.

Il existe trois façons de déclarer un DAG :

  1. Vous pouvez utiliser un gestionnaire de contexte, qui ajoute le DAG à tout ce qu’il contient implicitement

  2. Vous pouvez utiliser un constructeur standard, en passant le DAG dans tous les opérateurs que vous utilisez

  3. Vous pouvez utiliser le décorateur @dag pour transformer une fonction en générateur DAG (depuis airflow.decorators importer dag)

Les daGs ne sont rien sans tâches à exécuter, et ceux-ci sont fournis sous la forme d’opérateurs, de capteurs ou de TaskFlow.

Vous pouvez en savoir plus sur les daGs, le flux de contrôle, les sous-groupes de tâches, etc. directement à partir de Apache Airflow. 

Exécution du DAG

L’exemple de code est disponible sur le git; téléchargez le code localement sur votre ordinateur et chargez le fichier wordcount.py dans le stockage Blob. Suivez les étapes pour importer DAG dans votre flux de travail créé lors de l’installation.

Le fichier wordcount.py est un exemple d'orchestration de l'envoi d'une tâche Flink à l'aide d'Apache Airflow avec HDInsight sur AKS. Le DAG a deux tâches :

  • obtenir OAuth Token

  • Appeler l’API REST Azure de soumission de travaux HDInsight Flink pour envoyer un nouveau travail

Le DAG s'attend à ce que le principal de service soit configuré, tel que décrit lors du processus de configuration des informations d'identification du client OAuth et à ce que la configuration d’entrée suivante soit transmise pour l’exécution.

Étapes d’exécution

  1. Exécutez le DAG à partir de l'interface utilisateur de flux d'air ,vous pouvez ouvrir l’interface utilisateur du Gestionnaire d'orchestration des flux de travail Azure Data Factory en cliquant sur l’icône de surveillance.

    Capture d’écran montrant l’interface utilisateur du Gestionnaire d’orchestration de flux de travail Azure Data Factory en cliquant sur l’icône surveiller.

  2. Sélectionnez le DAG « FlinkWordCountExample » dans la page « DAGs ».

    Capture d’écran montrant la sélection de l’exemple de décompte de mots Flink.

  3. Cliquez sur l’icône « Exécuter » dans le coin supérieur droit, puis sélectionnez « Déclencher le DAG w/ config ».

    Capture d’écran montrant l’icône d’exécution sélectionnée.

  4. Passer une configuration JSON requise

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscription":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Cliquez sur le bouton « Déclencheur », il démarre l’exécution du DAG.

  6. Vous pouvez visualiser l’état des tâches DAG pendant l’exécution du DAG

    La capture d’écran montre l’état de la tâche DAG.

  7. Valider l’exécution du travail à partir du portail

    Capture d’écran montrant la validation de l’exécution du travail.

  8. Valider le travail à partir de « Tableau de bord Apache Flink »

    Capture d’écran montrant le tableau de bord Apache Flink.

Exemple de code

Il s’agit d’un exemple d’orchestration du pipeline de données à l’aide d’Airflow avec HDInsight sur AKS.

Le DAG s'attend à ce que le Service Principal soit mis en place pour l'accréditation du client OAuth et à passer la configuration d'entrée suivante pour l'exécution :

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscription':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Référence