Sdílet prostřednictvím


Orchestrace úloh Apache Flink® pomocí Správce orchestrace pracovních postupů služby Azure Data Factory (využívající Apache Airflow)

Důležitý

Azure HDInsight v AKS byl vyřazen 31. ledna 2025. Zjistěte více v tomto oznámení.

Abyste se vyhnuli náhlému ukončení úloh, musíte migrovat úlohy do Microsoft Fabric nebo ekvivalentního produktu Azure.

Důležitý

Tato funkce je aktuálně ve verzi Preview. Doplňkové podmínky použití pro preview verze Microsoft Azure obsahují další právní podmínky, které se vztahují na funkce Azure, které jsou v beta verzi, v preview verzi nebo ještě nebyly vydány do obecné dostupnosti. Informace o této konkrétní verzi Preview najdete v tématu Azure HDInsight ve službě AKS ve verzi Preview. Pokud máte dotazy nebo návrhy funkcí, odešlete prosím žádost o AskHDInsight s podrobnostmi a sledujte nás o dalších aktualizacích komunity Azure HDInsight.

Tento článek popisuje správu úlohy Flink pomocí rozhraní Azure REST API a kanálu orchestrace dat pomocí Správce orchestrace pracovních postupů služby Azure Data Factory.  Azure Data Factory Workflow Orchestration Manager service je jednoduchý a efektivní způsob, jak vytvářet a spravovat prostředí Apache Airflow, což umožňuje snadné spouštění datových kanálů ve velkém měřítku.

Apache Airflow je opensourcová platforma, která programově vytváří, plánuje a monitoruje složité datové pracovní postupy. Umožňuje definovat sadu úloh označovaných jako operátory, které se dají kombinovat do směrovaných acyklických grafů (DAG), které představují datové kanály.

Následující diagram znázorňuje umístění Airflow, Key Vault a HDInsight na AKS v Azure.

Snímek obrazovky ukazuje umístění toku vzduchu, trezoru klíčů a HDInsight v AKS v Azure.

Na základě rozsahu se vytvoří několik služebních identit Azure, které omezují potřebný přístup a samostatně spravují životní cyklus přihlašovacích údajů klienta.

Doporučuje se pravidelně obměňovat přístupové klíče nebo tajné kódy.

Kroky nastavení

  1. nastavení clusteru Flink

  2. Nahrajte soubor JAR úlohy Flink do účtu úložiště. Může to být primární účet úložiště přidružený ke clusteru Flink nebo jakýkoli jiný účet úložiště. V tomto účtu úložiště byste měli přiřadit roli "Vlastník dat objektu blob úložiště" k MSI přiřazené uživatelem, které se používá pro cluster.

  3. Azure Key Vault – Pokud ho nemáte, můžete postupovat podle tohoto kurzu a vytvořit novou službu Azure Key Vault.

  4. Vytvořte služební principál Microsoft Entra pro přístup ke službě Azure Key Vault – udělte oprávnění pro přístup ke službě Azure Key Vault s rolí "Key Vault Secrets Officer" a poznamenejte si z odpovědi 'appId', 'password' a 'tenant'. Musíme použít totéž pro Airflow, aby úložiště Key Vault sloužilo jako zázemí pro ukládání citlivých informací.

    az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID> 
    
  5. Povolte Azure Key Vault for Workflow Orchestraation Manager k ukládání a správě citlivých informací zabezpečeným a centralizovaným způsobem. Tímto způsobem můžete použít proměnné a připojení, které se automaticky uloží do služby Azure Key Vault. Názvy připojení a proměnných musí mít předponu definovanou proměnnou `variables_prefix` v AIRFLOW__SECRETS__BACKEND_KWARGS. Pokud má například variables_prefix hodnotu jako hdinsight-aks-variables, pak byste pro klíč proměnné hello chtěli uložit proměnnou na hdinsight-aks-variable -hello.

    • Přidejte následující nastavení pro přepsání konfigurace Airflow v integrovaných vlastnostech modulu runtime:

      • AIRFLOW__SECRETS__BACKEND: "airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"

      • AIRFLOW__SECRETS__BACKEND_KWARGS:
        "{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”

    • Do vlastností integrovaného runtime modulu Airflow přidejte následující položku pro konfiguraci proměnných prostředí:

      • AZURE_CLIENT_ID = <App Id from Create Azure AD Service Principal>

      • AZURE_TENANT_ID = <Tenant from Create Azure AD Service Principal>

      • AZURE_CLIENT_SECRET = <Password from Create Azure AD Service Principal>

      Přidejte požadavky na Airflow – apache-airflow-providers-microsoft-azure

      Snímek obrazovky ukazuje konfiguraci toku vzduchu a proměnné prostředí.

  6. Vytvořte instanční objekt Microsoft Entra pro přístup k Azure – Udělte oprávnění pro přístup ke clusteru HDInsight AKS s rolí Přispěvatel, zaznamenejte si appId, heslo a tenant z odpovědi.

    az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>

  7. Ve svém trezoru klíčů vytvořte následující tajné kódy s hodnotou z předchozího ID aplikace instančního objektu SLUŽBY AD, hesla a tenanta s předponou vlastností variables_prefix definovanou v AIRFLOW__SECRETS__BACKEND_KWARGS. Kód DAG má přístup k libovolné z těchto proměnných bez variables_prefix.

    • hdinsight-aks-variables-api-client-id=<App ID from previous step>

    • hdinsight-aks-variables-api-secret=<Password from previous step>

    • hdinsight-aks-variables-tenant-id=<Tenant from previous step>

    from airflow.models import Variable 
    
    def retrieve_variable_from_akv(): 
    
        variable_value = Variable.get("client-id") 
    
        print(variable_value) 
    

Definice DAG

DAG (směrovaný acyklický graf) je základní koncept Airflow, který shromažďuje úkoly, uspořádává je se závislostmi a vztahy a určuje způsob jejich spuštění.

DaG můžete deklarovat třemi způsoby:

  1. Můžete použít správce kontextu, který přidá DAG do čehokoli v něm implicitně.

  2. Můžete použít standardní konstruktor, který předá DAG všem operátorům, které používáte.

  3. Pomocí @dag dekorátoru můžete funkci převést na generátor DAG (z airflow.decorators import dag).

DaG nejsou nic bez úloh ke spuštění a ty jsou ve formě operátorů, senzorů nebo taskflow.

Další podrobnosti o dagech, toku řízení, subDAG, skupinách úloh atd. si můžete přečíst přímo z Apache Airflow. 

Provádění DAG

Ukázkový kód je k dispozici na git; stáhněte kód místně do počítače a nahrajte wordcount.py do úložiště objektů blob. Postupujte podle kroků importu DAG do pracovního postupu vytvořeného během instalace.

Wordcount.py je příkladem orchestrace odesílání úloh Flink pomocí Apache Airflow s HDInsight v AKS. DaG má dva úkoly:

  • získat OAuth Token

  • Použijte rozhraní Azure REST API pro odesílání úloh Flink služby HDInsight k odeslání nové úlohy

Dag očekává mít nastavení služebního principálu, jak je popsáno v průběhu procesu nastavení přihlašovacích údajů klienta OAuth, a předat následující konfigurační vstupy ke spuštění.

Kroky provádění

  1. Spusťte DAG z uživatelského rozhraní Airflow, můžete otevřít uživatelské rozhraní Orchestration Manageru pracovních postupů služby Azure Data Factory kliknutím na ikonu Monitorování.

    snímek obrazovky ukazuje otevření uživatelského rozhraní Správce orchestrace pracovního postupu služby Azure Data Factory kliknutím na ikonu monitoru.

  2. Na stránce "DAGs" vyberte "FlinkWordCountExample" DAG.

    Snímek obrazovky ukazuje vybrání příkladu počítání slov pomocí Flink.

  3. V pravém horním rohu klikněte na ikonu Spustit a vyberte Trigger DAG w/config.

    Snímek obrazovky ukazuje ikonu pro spuštění.

  4. Poskytněte požadovaný konfigurační JSON.

    { 
    
      "jarName":"WordCount.jar", 
    
      "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", 
    
      "subscription":"<cluster subscription id>", 
    
      "rg":"<cluster resource group>", 
    
      "poolNm":"<cluster pool name>", 
    
      "clusterNm":"<cluster name>" 
    
    } 
    
  5. Klikněte na tlačítko Spustit, zahájí se provedení DAGu.

  6. Stav úloh DAG můžete vizualizovat ze spuštění DAG.

    Snímek obrazovky zobrazuje stav úkolu DAG.

  7. Ověření provádění úlohy z portálu

    Snímek obrazovky ukazuje spuštění ověřovací úlohy.

  8. Zkontrolujte úlohu z „řídicího panelu Apache Flink“

    Snímek obrazovky ukazuje řídicí panel Apache Flink.

Ukázkový kód

Toto je příklad orchestrace datového kanálu pomocí Airflow s HDInsight v AKS.

DaG očekává nastavení instančního objektu pro přihlašovací údaje klienta OAuth a předání následující vstupní konfigurace pro spuštění:

{
 'jarName':'WordCount.jar',
 'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net', 
 'subscription':'<cluster subscription id>',
 'rg':'<cluster resource group>', 
 'poolNm':'<cluster pool name>',
 'clusterNm':'<cluster name>'
 }

Odkaz