Apache Flink-taakindeling® met behulp van Azure Data Factory Workflow Orchestration Manager (mogelijk gemaakt door Apache Airflow)
Belangrijk
Azure HDInsight op AKS is op 31 januari 2025 buiten gebruik gesteld. Meer informatie via deze aankondiging.
U moet uw workloads migreren naar Microsoft Fabric- of een gelijkwaardig Azure-product om plotselinge beëindiging van uw workloads te voorkomen.
Belangrijk
Deze functie is momenteel beschikbaar als preview-versie. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews meer juridische voorwaarden bevatten die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet in algemene beschikbaarheid zijn vrijgegeven. Voor meer informatie over deze specifieke preview, zie Azure HDInsight op AKS preview-informatie. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight- met de details en volgt u ons voor meer updates over Azure HDInsight Community-.
In dit artikel wordt het beheren van een Flink-taak beschreven met behulp van Azure REST API en indelingsgegevenspijplijn met Azure Data Factory Workflow Orchestration Manager. Azure Data Factory Workflow Orchestration Manager service is een eenvoudige en efficiënte manier om Apache Airflow omgevingen te maken en beheren, zodat u gegevenspijplijnen eenvoudig op schaal kunt uitvoeren.
Apache Airflow is een opensource-platform dat programmatisch complexe gegevenswerkstromen maakt, plant en bewaakt. Hiermee kunt u een set taken definiëren, operatoren genoemd die kunnen worden gecombineerd tot omgeleide acyclische grafieken (DAG's) om gegevenspijplijnen weer te geven.
In het volgende diagram ziet u de plaatsing van Airflow, Key Vault en HDInsight in AKS in Azure.
Er worden meerdere Azure-service-principals gecreëerd op basis van de scope om de benodigde toegang te beperken en om de levenscyclus van clientreferenties onafhankelijk te beheren.
Het wordt aanbevolen om regelmatig toegangssleutels of geheimen te roteren.
Installatiestappen
Upload uw Flink Job JAR naar het opslagaccount. Dit kan het primaire opslagaccount zijn dat is gekoppeld aan het Flink-cluster of een ander opslagaccount, waarbij u de rol Opslagblobgegevenseigenaar moet toewijzen aan de door de gebruiker toegewezen MSI die wordt gebruikt voor het cluster in dit opslagaccount.
Azure Key Vault: U kunt deze zelfstudie volgen om een nieuwe Azure Key Vault te maken als u er nog geen hebt.
Maak Microsoft Entra-service-principal om toegang te krijgen tot Key Vault – verleen toestemming voor toegang tot Azure Key Vault met de rol "Key Vault Secrets Officer", en noteer 'appId', 'wachtwoord' en 'tenant' uit de reactie. We moeten hetzelfde gebruiken voor Airflow om Key Vault-opslag te gebruiken als back-ends voor het opslaan van gevoelige informatie.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Schakel Azure Key Vault in voor Werkstroomindelingsbeheer om uw gevoelige informatie op een veilige en gecentraliseerde manier op te slaan en te beheren. Hiervoor kunt u variabelen en verbindingen gebruiken en deze automatisch opslaan in Azure Key Vault. De namen van verbindingen en variabelen moeten worden voorafgegaan door de prefix "variables_prefix" zoals gedefinieerd in AIRFLOW__SECRETS__BACKEND_KWARGS. Als variables_prefix bijvoorbeeld een waarde als hdinsight-aks-variables heeft, wilt u de variabele sleutel hello opslaan op hdinsight-aks-variable -hello.
Voeg de volgende instellingen toe voor het aanpassen van de Airflow-configuratie-instellingen in de eigenschappen van de geïntegreerde runtime:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Voeg de volgende instelling toe voor de configuratie van omgevingsvariabelen in de eigenschappen van de geïntegreerde Airflow-runtime:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Vereisten voor airflow toevoegen - apache-airflow-providers-microsoft-azure
Maak Microsoft Entra-service-principal aan om toegang tot Azure te krijgen. Verleen toestemming voor toegang tot het HDInsight AKS-cluster met de rol Medewerker, en noteer de appId, het wachtwoord en de tenant uit het antwoord.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Maak de volgende geheimen aan in uw sleutelkluis met de waarde van de vorige AD-service-principal appId, wachtwoord en tenant, voorafgegaan door de eigenschap variables_prefix die is gedefinieerd in AIRFLOW__SECRETS__BACKEND_KWARGS. De DAG-code heeft toegang tot een van deze variabelen zonder variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
DAG-definitie
Een DAG (Directed Acyclic Graph) is het kernconcept van Airflow, waarbij taken worden samengebracht en georganiseerd met afhankelijkheden en relaties om aan te geven hoe ze moeten worden uitgevoerd.
Er zijn drie manieren om een DAG te declareren:
U kunt een contextmanager gebruiken, waarmee de DAG impliciet wordt toegevoegd aan alles erin
U kunt een standaardconstructor gebruiken, waarbij de DAG wordt doorgegeven aan alle operators die u gebruikt
U kunt de @dag decorator gebruiken om een functie om te zetten in een DAG generator (uit airflow.decorators import dag)
DAG's zijn niets zonder taken die moeten worden uitgevoerd, en die komen in de vorm van Operators, Sensoren of TaskFlow.
U kunt meer informatie lezen over DAG's, besturingsstroom, subDAG's, taskgroups, enzovoort, rechtstreeks vanuit Apache Airflow-.
DAG-uitvoering
Voorbeeldcode is beschikbaar op de git-; download de code lokaal op uw computer en upload de wordcount.py naar een blobopslag. Volg de stappen om DAG in uw werkstroom te importeren die bij het instellen is gemaakt.
De wordcount.py is een voorbeeld van het organiseren van een Flink-taakverzending met behulp van Apache Airflow met HDInsight in AKS. De DAG heeft twee taken:
OAuth Token
ophalenHDInsight Flink Job Submission Azure REST API aanroepen om een nieuwe taak in te dienen
De DAG verwacht dat het is ingesteld voor de Service Principal, zoals beschreven tijdens het installatieproces voor de OAuth Client-aanmeldingsgegevens, en geeft de volgende invoerconfiguratie door voor de uitvoering.
Uitvoeringsstappen
Voer de DAG uit vanuit de Airflow UI. U kunt de gebruikersinterface van Azure Data Factory Workflow Orchestration Manager openen door op het pictogram Monitor te klikken.
Selecteer de DAG 'FlinkWordCountExample' op de pagina 'DAG's'.
Klik op het pictogram 'Uitvoeren' in de rechterbovenhoek en selecteer 'DAG activeren met configuratie'.
Vereiste configuratie-JSON doorgeven
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Klik op de knop "Trigger", dit start de uitvoering van de DAG.
U kunt de status van DAG-taken visualiseren vanuit de DAG-uitvoering
De taakuitvoering valideren vanuit de portal
Valideer de taak vanuit 'Apache Flink Dashboard'
Voorbeeldcode
Dit is een voorbeeld van het organiseren van een gegevenspijplijn met behulp van Airflow met HDInsight in AKS.
De DAG verwacht dat een service-principal is ingesteld voor de OAuth-clientreferentie en dat de benodigde invoerconfiguratie wordt doorgegeven voor de uitvoering.
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Referentie
- Raadpleeg de voorbeeldcode.
- Apache Flink Website
- Apache, Apache Airflow, Airflow, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).