Orkiestracja zadań Apache Flink® z wykorzystaniem Menedżera Orkiestracji Przepływu Pracy usługi Azure Data Factory (zasilanego przez Apache Airflow)
Ważny
Usługa Azure HDInsight w usłudze AKS została wycofana 31 stycznia 2025 r. Dowiedz się więcej dzięki temu ogłoszeniu.
Aby uniknąć nagłego kończenia obciążeń, należy przeprowadzić migrację obciążeń do usługi Microsoft Fabric lub równoważnego produktu platformy Azure.
Ważny
Ta funkcja jest obecnie dostępna w wersji zapoznawczej. Dodatkowe warunki użytkowania dla Microsoft Azure Previews zawierają więcej warunków prawnych dotyczących funkcji platformy Azure, które znajdują się w wersji beta, wersji zapoznawczej lub nie są jeszcze dostępne ogólnie. Aby uzyskać informacje na temat tej konkretnej wersji zapoznawczej, zobacz informacje o wersji zapoznawczej Azure HDInsight na AKS. W przypadku pytań lub sugestii dotyczących funkcji prześlij żądanie na AskHDInsight, a także śledź nas, aby uzyskać więcej aktualizacji na temat Społeczności Azure HDInsight.
W tym artykule omówiono zarządzanie zadaniem Flink przy użyciu interfejsu API REST platformy Azure oraz orkiestrację przepływu danych za pomocą Azure Data Factory Workflow Orchestration Manager. usługi Azure Data Factory Workflow Orchestration Manager to prosty i wydajny sposób tworzenia środowisk Apache Airflow i zarządzania nimi, co umożliwia łatwe uruchamianie potoków danych na dużą skalę.
Apache Airflow to platforma typu open source, która programowo tworzy, planuje i monitoruje złożone przepływy pracy danych. Umożliwia zdefiniowanie zestawu zadań nazywanych operatorami, które można połączyć w skierowane grafy acykliczne (DAG) do reprezentowania potoków danych.
Na poniższym diagramie przedstawiono rozmieszczenie usług Airflow, Key Vault i HDInsight w usłudze AKS na platformie Azure.
Wiele zasad dostępu do usługi Azure jest tworzonych w oparciu o zakres, aby ograniczyć wymagany dostęp i niezależnie zarządzać cyklem życia poświadczeń klienta.
Zaleca się okresowe zmienianie kluczy dostępu lub tajnych informacji.
Kroki konfiguracji
Przekaż plik JAR zadania Flink do konta magazynowego. Może to być podstawowe konto magazynu związane z klastrem Flink lub dowolne inne konto magazynu, do którego należy przypisać rolę "Właściciel danych blob magazynu" dla przypisanej przez użytkownika tożsamości usługi zarządzanej, używanej w przypadku tego klastra na danym koncie magazynu.
Azure Key Vault — możesz skorzystać z tego samouczka, aby utworzyć nowy Azure Key Vault, jeśli go nie masz.
Utwórz jednostkę usługi Microsoft Entra , aby uzyskać dostęp do Key Vault — udziel uprawnień dostępu do usługi Azure Key Vault za pomocą roli „Key Vault Secrets Officer” i zanotuj „appId”, „password” i „tenant” z odpowiedzi. Musimy użyć tego samego w Airflow, aby wykorzystać magazyn Key Vault jako zaplecze do przechowywania poufnych informacji.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Włącz usługę Azure Key Vault for Workflow Orchestration Manager, aby przechowywać poufne informacje i zarządzać nimi w bezpieczny i scentralizowany sposób. Dzięki temu można używać zmiennych i połączeń, a także automatycznie przechowywać je w usłudze Azure Key Vault. Nazwa połączeń i zmiennych musi być poprzedzona prefiksem variables_prefix zdefiniowanym w AIRFLOW__SECRETS__BACKEND_KWARGS. Jeśli na przykład variables_prefix ma wartość hdinsight-aks-variables, wówczas dla klucza zmiennej hello należy przechowywać zmienną w zmiennej hdinsight-aks-variable -hello.
Dodaj następujące ustawienia dla przesłonięć konfiguracji airflow we właściwościach zintegrowanego środowiska uruchomieniowego:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Dodaj następujące ustawienie dla konfiguracji zmiennych środowiskowych we właściwościach zintegrowanego środowiska uruchomieniowego airflow:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Dodaj wymagania dotyczące Airflow — apache-airflow-providers-microsoft-azure
Utwórz główny obiekt zabezpieczeń Microsoft Entra, aby uzyskać dostęp do platformy Azure. Udziel uprawnień do dostępu do klastra HDInsight AKS z rolą Współtwórca i zanotuj identyfikator appId, hasło oraz identyfikator dzierżawcy z odpowiedzi.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Utwórz następujące wpisy tajne w magazynie kluczy z wartością z poprzedniej głównej aplikacji usługi AD appId, hasło i tenant, poprzedzonej właściwością variables_prefix zdefiniowaną w AIRFLOW__SECRETS__BACKEND_KWARGS. Kod DAG może uzyskać dostęp do dowolnej z tych zmiennych bez variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
Definicja języka DAG
DAG (Skierowany graf acykliczny) to podstawowa koncepcja Airflow, która zbiera zadania razem, organizując je z zależnościami i relacjami, określając, jak powinny być uruchamiane.
Istnieją trzy sposoby zadeklarowania DAG-u:
Możesz użyć menedżera kontekstu, który dodaje DAG do wszystkiego wewnątrz niego niejawnie
Można użyć standardowego konstruktora, przekazując grupę DAG do dowolnych operatorów, których używasz
Możesz użyć dekoratora @dag, aby przekształcić funkcję w generator DAG (z airflow.decorators import dag)
DAGi są niczym bez zadań do uruchomienia, a te przyjmują formę operatorów, czujników lub taskflow.
Więcej szczegółowych informacji na temat DAG-ów, przepływu sterowania, subDAG-ów, grup zadań, itp. można znaleźć bezpośrednio w Apache Airflow.
Wykonywanie DAG
Przykładowy kod jest dostępny w git; pobierz kod na swój komputer i przekaż wordcount.py do blob storage. Przeprowadź kroki oznaczone jako i, aby zaimportować DAG do przepływu pracy utworzonego podczas konfiguracji.
Wordcount.py to przykład organizowania przesyłania zadania Flink przy użyciu platformy Apache Airflow z usługą HDInsight w usłudze AKS. Grupa DAG ma dwa zadania:
pobierz
OAuth Token
Wywoływanie przesyłania zadania Flink usługi HDInsight za pomocą interfejsu API REST platformy Azure w celu przesłania nowego zadania
Grupa DAG oczekuje konfiguracji głównej jednostki usługi, zgodnie z opisem z procesu instalacji poświadczeń klienta OAuth. Następnie przekaże następującą konfigurację wejściową do wykonania.
Kroki wykonywania
Wykonaj DAG z interfejsu użytkownika Airflow , możesz otworzyć interfejs użytkownika Azure Data Factory Workflow Orchestration Manager, klikając ikonę Monitor.
Wybierz DAG "FlinkWordCountExample" na stronie "DAGs".
Kliknij ikonę „wykonaj” w prawym górnym rogu i wybierz opcję „Wyzwól DAG z konfiguracją”.
Prześlij wymagany plik JSON konfiguracji
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Kliknij przycisk "Wyzwalacz", aby uruchomić wykonywanie DAG.
Wizualizacja stanu zadań DAG z uruchomienia DAG
Weryfikowanie wykonania zadania z poziomu portalu
Weryfikowanie zadania z poziomu pulpitu nawigacyjnego narzędzia Apache Flink
Przykładowy kod
Jest to przykład orkiestracji potoku danych przy użyciu Airflow z HDInsight na AKS.
Grupa DAG oczekuje przygotowania podmiotu usługi dla poświadczeń klienta OAuth i przekaże następującą konfigurację danych wejściowych do procesu wykonania.
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Odniesienie
- Zapoznaj się z przykładowym kodem .
- witryna internetowa Apache Flink
- Nazwy projektów typu apache, Apache Airflow, Airflow, Apache Flink, Flink i skojarzone z nimi nazwy projektów typu open source są znakami towarowymiApache Software Foundation (ASF).