Freigeben über


Die Apache Flink®-Auftragsorchestrierung mit dem Azure Data Factory Workflow Orchestration Manager (unterstützt von Apache Airflow)

Hinweis

Azure HDInsight on AKS wird am 31. Januar 2025 eingestellt. Vor dem 31. Januar 2025 müssen Sie Ihre Workloads zu Microsoft Fabric oder einem gleichwertigen Azure-Produkt migrieren, um eine abruptes Beendigung Ihrer Workloads zu vermeiden. Die verbleibenden Cluster in Ihrem Abonnement werden beendet und vom Host entfernt.

Bis zum Einstellungsdatum ist nur grundlegende Unterstützung verfügbar.

Wichtig

Diese Funktion steht derzeit als Vorschau zur Verfügung. Die zusätzlichen Nutzungsbedingungen für Microsoft Azure-Vorschauen enthalten weitere rechtliche Bestimmungen, die für Azure-Features in Betaversionen, in Vorschauversionen oder anderen Versionen gelten, die noch nicht allgemein verfügbar gemacht wurden. Informationen zu dieser spezifischen Vorschau finden Sie unter Informationen zur Vorschau von Azure HDInsight on AKS. Bei Fragen oder Funktionsvorschlägen senden Sie eine Anfrage an AskHDInsight mit den entsprechenden Details, und folgen Sie uns für weitere Updates in der Azure HDInsight-Community.

Dieser Artikel befasst sich mit der Verwaltung von Flink-Aufträgen mithilfe der Azure-REST-API und der Orchestrierungsdatenpipeline mit Azure Data Factory Workflow Orchestration Manager. Der Dienst Azure Data Factory Workflow Orchestration Manager-Dienst ist ein einfacher und effizienter Weg, um Apache Airflow-Umgebungen zu erstellen und zu verwalten, so dass Sie Datenpipelines in großem Umfang ganz einfach ausführen können.

Apache Airflow ist eine Open-Source-Plattform zum programmgesteuerten Erstellen, Planen und Überwachen komplexer Datenworkflows. Sie ermöglicht es Ihnen, eine Reihe von Aufgaben zu definieren, die als Operatoren bezeichnet werden und zu gerichteten azyklischen Graphen (Directed Acyclic Graphs, DAGs) kombiniert werden können, um Datenpipelines darzustellen.

Das folgende Diagramm zeigt die Platzierung von Airflow, Key Vault und HDInsight on AKS in Azure:

Screenshot der Platzierung von Airflow, Key Vault und HDInsight auf AKS in Azure.

Mehrere Azure-Dienstprinzipale werden basierend auf dem Bereich erstellt, um den benötigten Zugriff einzuschränken und den Lebenszyklus von Clientanmeldeinformationen unabhängig voneinander zu verwalten.

Es wird empfohlen, Zugriffsschlüssel oder Geheimnisse regelmäßig zu rotieren.

Schritte zum Einrichten

  1. Einrichten eines Flink-Clusters

  2. Laden Sie die JAR-Datei für Ihren Flink-Auftrag in das Speicherkonto hoch. Dies kann das primäre Speicherkonto, das dem Flink-Cluster zugeordnet ist, oder ein anderes Speicherkonto sein, für das die Rolle „Besitzer von Speicherblobdaten“ der benutzerseitig zugewiesenen MSI zugewiesen werden sollte, die für den Cluster für dieses Speicherkonto verwendet wird.

  3. Azure Key Vault: Sie können dieses Tutorial befolgen, um eine neue Azure Key Vault-Instanz zu erstellen, falls Sie noch keine besitzen.

  4. Erstellen Sie einen Microsoft Entra-Dienstprinzipal, um auf Key Vault zuzugreifen: Gewähren Sie die Berechtigung zum Zugreifen auf Azure Key Vault mit der Rolle „Geheimnisbeauftragter für Schlüsseltresore“, und notieren Sie sich die Werte für „appId“, „password“ und „tenant“ aus der Antwort. Wir müssen für Airflow dieselben Werte verwenden, um den Key Vault-Speicher als Back-Ends zum Speichern vertraulicher Informationen zu nutzen.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Aktivieren Sie Azure Key Vault für Workflow Orchestration Manager, um Ihre vertraulichen Informationen auf sichere und zentralisierte Weise zu speichern und zu verwalten. Dadurch können Sie Variablen und Verbindungen verwenden, die automatisch in Azure Key Vault gespeichert werden. Dem Namen von Verbindungen und Variablen muss wie in AIRFLOW__SECRETS__BACKEND_KWARGS definiert das Präfix „variables_prefix“ vorangestellt werden. Wenn „variables_prefix“ z. B. den Wert „hdinsight-aks-variables“ hat, müssen Sie für den Variablenschlüssel „hello“ die Variable unter „hdinsight-aks-variable -hello“ speichern.

    • Fügen Sie die folgenden Einstellungen für die Airflow-Konfigurationsüberschreibungen in den Integrated Runtime-Eigenschaften hinzu:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Fügen Sie die folgende Einstellung für die Konfiguration von Umgebungsvariablen in den Integrated Runtime-Eigenschaften von Airflow hinzu:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Hinzufügen von Airflow-Anforderungen: apache-airflow-providers-microsoft-azure

      Screenshot der Airflow-Konfiguration und -Umgebungsvariablen.

  6. Erstellen Sie einen Microsoft Entra-Dienstprinzipal, um auf Azure zuzugreifen: Gewähren Sie die Berechtigung zum Zugreifen auf den HDInsight-AKS-Cluster mit der Rolle „Mitwirkender“, und notieren Sie sich die Werte für „appId“, „password“ und „tenant“ aus der Antwort.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Erstellen Sie die folgenden Geheimnisse in Ihrem Schlüsseltresor mit den oben notierten Werten des Dienstprinzipals für „appId“, „password“ und „tenant“. Fügen Sie die in AIRFLOW__SECRETS__BACKEND_KWARGS definierte Eigenschaft „variables_prefix“ als Präfix hinzu. Der DAG-Code kann ohne „variables_prefix“ auf jede dieser Variablen zugreifen.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

DAG-Definition

Ein DAG (Directed Acyclic Graph, gerichteter azyklischer Graph) ist das Kernkonzept von Airflow. Er fasst Aufgaben zusammen und organisiert sie mit Abhängigkeiten und Beziehungen, um anzugeben, wie sie ausgeführt werden sollen.

Es stehen drei Methoden zum Deklarieren eines DAG zur Verfügung:

  1. Sie können einen Kontext-Manager verwenden, der den DAG implizit zu allen darin enthaltenen Elementen hinzufügt.

  2. Sie können einen Standardkonstruktor verwenden und den DAG an alle von Ihnen verwendeten Operatoren übergeben.

  3. Sie können das Decorator-Element @dag verwenden, um eine Funktion in einen DAG-Generator umzuwandeln (aus „airflow.decorators import dag“).

DAGs sind ohne auszuführende Aufgaben wertlos, und diese liegen in Form von Operatoren, Sensoren oder TaskFlows vor.

Weitere Details zu DAGs, Ablaufsteuerung, SubDAGs, TaskGroups usw. finden Sie direkt unter Apache Airflow. 

DAG-Ausführung

Beispielcode ist auf git verfügbar. Laden Sie den Code lokal auf Ihren Computer herunter, und laden Sie „wordcount.py“ in einen Blobspeicher hoch. Führen Sie die Schritte aus, um den gerichteten azyklischen Graphen in Ihre Workflow zu importieren, die während der Einrichtung erstellt wurde.

„wordcount.py“ ist ein Beispiel für die Orchestrierung einer Flink-Auftragsübermittlung mithilfe von Apache Airflow mit HDInsight on AKS. Der DAG hat zwei Aufgaben:

  • Abrufen von OAuth Token

  • Aufrufen der Azure-REST-API für die HDInsight Flink-Auftragsübermittlung zum Senden eines neuen Auftrags

Der DAG erwartet, dass der Dienstprinzipal wie im Setupprozess für die OAuth-Clientanmeldeinformationen beschrieben eingerichtet wurde und dass die folgende Eingabekonfiguration für die Ausführung übergeben wird.

Ausführungsschritte

  1. Führen Sie den gerichteten azyklischen Graphen über die Airflow-Benutzeroberfläche aus. Sie können die Workflow Orchestration Manager-Benutzeroberfläche für Azure Data Factory öffnen, indem Sie das Überwachungssymbol auswählen.

    Der Screenshot zeigt, wie Sie die Azure Data Factory Workflow Orchestration Manager-Benutzeroberfläche öffnen, indem Sie das Monitor-Symbol auswählen.

  2. Wählen Sie den DAG „FlinkWordCountExample“ auf der Seite „DAGs“ aus.

    Screenshot eines Beispiels für die Wortzählung in Flink.

  3. Klicken Sie in der oberen rechten Ecke auf das Ausführungssymbol, und wählen Sie „Trigger DAG w/ config“ aus.

    Screenshot des ausgewählten Ausführungssymbols.

  4. Übergeben des erforderlichen JSON-Konfigurationscodes

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscription":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Klicken Sie auf die Schaltfläche „Trigger“. Dadurch wird die Ausführung des DAG gestartet.

  6. Sie können den Status von DAG-Aufgaben über die DAG-Ausführung visualisieren.

    Screenshot des Status der gerichteter azyklischer Graph-Aufgabe.

  7. Überprüfen der Auftragsausführung über das Portal

    Screenshot der Ausführung des Auftrags zur Validierung.

  8. Überprüfen des Auftrags auf dem Apache Flink-Dashboard

    Screenshot des Apache Flink-Dashboards.

Beispielcode

Dies ist ein Beispiel für die Orchestrierung einer Datenpipeline mithilfe von Airflow mit HDInsight on AKS.

Der DAG erwartet, dass der Dienstprinzipal für die OAuth-Clientanmeldeinformationen eingerichtet wurde und dass die folgende Eingabekonfiguration für die Ausführung übergeben wird:

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscription':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Verweis