Dela via


Apache Flink-jobborkestrering® med Azure Data Factory Workflow Orchestration Manager (drivs av Apache Airflow)

Viktig

Azure HDInsight på AKS drogs tillbaka den 31 januari 2025. Läs mer med det här meddelandet.

Du måste migrera dina arbetsbelastningar till Microsoft Fabric- eller en motsvarande Azure-produkt för att undvika plötsliga uppsägningar av dina arbetsbelastningar.

Viktig

Den här funktionen är för närvarande i förhandsversion. De kompletterande användningsvillkoren för Förhandsversioner av Microsoft Azure innehåller fler juridiska villkor som gäller för Azure-funktioner som är i betaversion, förhandsversion eller på annat sätt ännu inte har gjorts allmänt tillgängliga. Information om den här specifika förhandsversionen finns i Azure HDInsight på AKS-förhandsversionsinformation. För frågor eller funktionsförslag, skicka en begäran på AskHDInsight med detaljerna och följ oss för fler uppdateringar från Azure HDInsight Community.

Den här artikeln beskriver hur du hanterar ett Flink-jobb med hjälp av Azure REST API och orkestreringsdatapipeline med Azure Data Factory Workflow Orchestration Manager.  Azure Data Factory Workflow Orchestration Manager-tjänsten är ett enkelt och effektivt sätt att skapa och hantera Apache Airflow miljöer så att du enkelt kan köra datapipelines i stor skala.

Apache Airflow är en plattform med öppen källkod som programmässigt skapar, schemalägger och övervakar komplexa dataarbetsflöden. Det gör att du kan definiera en uppsättning uppgifter som kallas operatorer som kan kombineras till riktade acykliska grafer (DAG:er) för att representera datapipelines.

Följande diagram visar placeringen av Airflow, Key Vault och HDInsight på AKS i Azure.

Skärmbild som visar placeringen av luftflöde, nyckelvalv och HDInsight på AKS i Azure.

Flera Azure Service-huvudnamn skapas baserat på omfånget för att begränsa den åtkomst som krävs och för att hantera klientens livscykel för autentiseringsuppgifter oberoende av varandra.

Vi rekommenderar att du roterar åtkomstnycklar eller hemligheter med jämna mellanrum.

Installationssteg

  1. Konfigurera Flink-kluster

  2. Ladda upp din Flink Job jar-fil till lagringskontot. Det kan vara det primära lagringskontot som är associerat med Flink-klustret eller något annat lagringskonto, där du bör tilldela rollen "Storage Blob Data Owner" till den användartilldelade MSI som används för klustret i det här lagringskontot.

  3. Azure Key Vault – Du kan följa den här handledningen för att skapa en ny Azure Key Vault i fall du inte har en.

  4. Skapa Microsoft Entra-tjänsthuvudnamn för att få åtkomst till Key Vault – Bevilja behörighet att komma åt Azure Key Vault med rollen "Key Vault Secrets Officer" och anteckna "appId", "lösenord" och "hyrkesgästen" från svaret. Vi behöver använda samma sak så att Airflow kan använda Key Vault-lagring som backend för att lagra känslig information.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Aktivera Azure Key Vault for Workflow Orchestration Manager för att lagra och hantera känslig information på ett säkert och centraliserat sätt. Genom att göra detta kan du använda variabler och anslutningar, och de lagras automatiskt i Azure Key Vault. Namnen på anslutningar och variabler måste föregås av variables_prefix som definieras i AIRFLOW__SECRETS__BACKEND_KWARGS. Om variables_prefix till exempel har ett värde som hdinsight-aks-variables vill du lagra variabeln i hdinsight-aks-variable -hello för en variabelnyckel för hello.

    • Lägg till följande inställningar för åsidosättningar av Airflow-konfigurationen i integrerade körningsegenskaper:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Lägg till följande inställning för konfigurationen av miljövariabler i egenskaperna för airflow-integrerad körning:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Lägg till Airflow-krav – apache-airflow-providers-microsoft-azure

      Skärmbild som visar konfiguration av luftflöde och miljövariabler.

  6. Skapa Microsoft Entra-tjänsthuvudnamn för att få åtkomst till Azure – Bevilja behörighet att komma åt HDInsight och AKS-kluster med deltagarroll, anteckna appId, lösenord och tenant från svaret.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Skapa följande hemligheter i ditt nyckelvalv med värdena från tidigare AD-tjänstens huvudappId, lösenord och klientorganisation, prefixat med variabel "variables_prefix" definierad i AIRFLOW__SECRETS__BACKEND_KWARGS. DAG-koden kan komma åt någon av dessa variabler utan variables_prefix.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

DAG-definition

En DAG (Directed Acyclic Graph) är kärnkonceptet i Airflow, som samlar uppgifter tillsammans och organiserar dem med beroenden och relationer för att bestämma hur de ska köras.

Det finns tre sätt att deklarera en DAG:

  1. Du kan använda en kontexthanterare som automatiskt lägger till DAG i allt som finns där inom.

  2. Du kan använda en standardkonstruktor som skickar DAG till alla operatorer som du använder

  3. Du kan använda @dag-dekorationen för att förvandla en funktion till en DAG-generator (från airflow.decorators import dag)

DAG:er är ingenting utan uppgifter som ska köras, och de kommer i form av operatorer, sensorer eller aktivitetsflöde.

Du kan läsa mer om DAG:er, Kontrollflöde, SubDAG:ar, Taskgrupper osv. direkt från Apache Airflow. 

DAG-körning

Exempelkod finns på git-; ladda ned koden lokalt på datorn och ladda upp wordcount.py till en bloblagring. Följ steg för att importera DAG till arbetsflödet som skapades under installationen.

Wordcount.py är ett exempel på att orkestrera en Flink-jobböverföring med Apache Airflow och HDInsight på AKS. DAG har två uppgifter:

  • hämta OAuth Token

  • Anropa HDInsight Flink Job Submission Azure REST API för att skicka ett nytt jobb

DAG förväntar sig att ha konfigurationen för tjänstens huvudidentitet, som beskrivs under installationsprocessen för OAuth-klientens behörigheter, samt skicka följande indatakonfiguration för att köra.

Exekveringssteg

  1. Kör DAG från Airflow-användargränssnittet. Du kan öppna Azure Data Factory Workflow Orchestration Manager-användargränssnittet genom att klicka på övervakningsikonen.

    Skärmbild som visar hur du öppnar användargränssnittet för Azure Data Factory Workflow Orchestration Manager genom att klicka på övervakningsikonen.

  2. Välj "FlinkWordCountExample" DAG från sidan "DAG:er".

    Skärmbild som visar hur du väljer exempel på antal Flink-ord.

  3. Klicka på ikonen "kör" i det övre högra hörnet och välj "Starta DAG med konfiguration".

    Skärmbild som visar ikonen välj kör.

  4. Skicka nödvändig konfigurations-JSON

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscription":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Klicka på knappen "Utlösare", det börjar körningen av DAG.

  6. Du kan visualisera statusen för DAG-uppgifter från en DAG-körning

    Skärmbild som visar daguppgiftens status.

  7. Verifiera jobbexekveringen från portalen

    Skärmbild som visar verifiering av jobbkörning.

  8. Verifiera jobbet från "Apache Flink Dashboard"

    Skärmbild som visar apache Flink-instrumentpanelen.

Exempelkod

Det här är ett exempel på att orkestrera datapipelines med hjälp av Airflow genom HDInsight på AKS.

DAG förväntar sig att ha en inställning för tjänstehuvudprincip för OAuth-klientens autentiseringsuppgifter och skicka följande konfiguration för indata för att genomföra körningen.

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscription':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Hänvisning