Orquestração de tarefas do Apache Flink® usando o Azure Data Factory Workflow Orchestration Manager (com tecnologia Apache Airflow)
Importante
O Azure HDInsight no AKS foi desativado em 31 de janeiro de 2025. Saiba mais com este anúncio.
Você precisa migrar suas cargas de trabalho para Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não liberadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Azure HDInsight no AKS informações de visualização. Para perguntas ou sugestões de funcionalidades, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para mais atualizações na Comunidade do Azure HDInsight.
Este artigo aborda a gestão de um trabalho Flink utilizando a API REST do Azure e a orquestração do pipeline de dados com o Gerenciador de Orquestração de Workflow do Azure Data Factory. serviço Azure Data Factory Workflow Orchestration Manager é uma maneira simples e eficiente de criar e gerir ambientes Apache Airflow, permitindo que se executem pipelines de dados facilmente em escala.
O Apache Airflow é uma plataforma de código aberto que cria, agenda e monitora programaticamente fluxos de trabalho de dados complexos. Ele permite definir um conjunto de tarefas, chamadas operadores, que podem ser combinadas em gráficos acíclicos direcionados (DAGs) para representar pipelines de dados.
O diagrama a seguir mostra o posicionamento do Airflow, Key Vault e HDInsight no AKS no Azure.
Vários Principais de Serviço do Azure são criados com base no escopo para limitar o acesso necessário. Isto permite gerir de forma independente o ciclo de vida das credenciais do cliente.
Recomenda-se alternar as chaves de acesso ou segredos periodicamente.
Etapas de configuração
Carregue o seu arquivo JAR do Flink Job na conta de armazenamento. Pode ser a conta de armazenamento principal associada ao cluster Flink ou qualquer outra conta de armazenamento, onde você deve atribuir a função "Storage Blob Data Owner" ao MSI atribuído pelo usuário usado para o cluster nessa conta de armazenamento.
Azure Key Vault - Você pode seguir este tutorial para criar um novo Azure Key Vault caso ainda não tenha um.
Criar principal de serviço do Microsoft Entra para aceder ao Cofre de Chaves – Conceder permissão para aceder ao Cofre de Chaves do Azure com a função "Responsável pelos Segredos do Cofre" e tomar nota de ‘appId’, ‘senha’ e ‘locatário’ da resposta. Precisamos utilizar o mesmo para que o Airflow use o armazenamento do Key Vault como back-ends para guardar informações confidenciais.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Habilite Azure Key Vault for Workflow Orchestration Manager para armazenar e gerenciar suas informações confidenciais de forma segura e centralizada. Ao fazer isso, você pode usar variáveis e conexões, e elas serão armazenadas automaticamente no Cofre da Chave do Azure. Os nomes das conexões e variáveis precisam ser prefixados pelo variables_prefix definido em AIRFLOW__SECRETS__BACKEND_KWARGS. Por exemplo, se variables_prefix tiver um valor como hdinsight-aks-variables, então para uma chave variável de hello, convém armazenar sua variável em hdinsight-aks-variable -hello.
Adicione as seguintes configurações para as substituições de configuração de fluxo de ar nas propriedades de tempo de execução integradas:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Adicione a seguinte configuração para a configuração de variáveis de ambiente nas propriedades de tempo de execução integrado do Airflow:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Adicionar requisitos do Airflow - apache-airflow-providers-microsoft-azure
Criar principal de serviço do Microsoft Entra para acessar o Azure – Conceda permissão para acessar o Cluster AKS do HDInsight com a função de Colaborador, anote appId, senha e locatário da resposta.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Crie os segredos a seguir no seu cofre de chaves usando o valor do appId, palavra-passe e locatário da entidade principal de serviço AD anterior, prefixados pela variável de propriedade variables_prefix definida em AIRFLOW__SECRETS__BACKEND_KWARGS. O código DAG pode acessar qualquer uma dessas variáveis sem variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
Definição de DAG
Uma DAG (Directed Acyclic Graph) é o conceito central do Airflow, agregando tarefas, organizadas com dependências e relacionamentos, indicando como devem ser executadas.
Há três maneiras de declarar um DAG:
Você pode usar um gerenciador de contexto, que adiciona o DAG a qualquer coisa dentro dele implicitamente
Você pode usar um construtor padrão, passando o DAG para qualquer operador que você usar
Pode usar o decorador @dag para transformar uma função num gerador de DAGs (a partir de airflow.decorators import dag)
DAGs não são nada sem Tarefas para executar, e eles vêm na forma de Operadores, Sensores ou TaskFlow.
Você pode ler mais detalhes sobre DAGs, Control Flow, SubDAGs, TaskGroups, etc. diretamente do Apache Airflow.
Execução do DAG
Exemplo de código está disponível no git; Faça o download do código para o seu computador e carregue o wordcount.py para um armazenamento Blob. Siga as etapas para importar o DAG para o seu fluxo de trabalho que foi criado durante a configuração.
O wordcount.py é um exemplo de orquestração de um envio de trabalho Flink usando Apache Airflow com HDInsight no AKS. O DAG tem duas tarefas:
obter
OAuth Token
Invoque a API REST do Azure para Submissão de Trabalho Flink do HDInsight para enviar um novo trabalho
O DAG espera ter a configuração da entidade de serviço, conforme descrito durante o processo de configuração para a credencial do cliente OAuth, e passar a seguinte configuração de entrada durante a execução.
Etapas de execução
Execute o DAG a partir do Airflow UI. Pode-se abrir a UI do Azure Data Factory Workflow Orchestration Manager clicando no ícone de Monitorização.
Selecione o DAG "FlinkWordCountExample" na página "DAGs".
Clique no ícone "executar" no canto superior direito e selecione "Trigger DAG w/ config".
Fornecer a configuração necessária em JSON
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Clique no botão "Acionar" e começa a execução do DAG.
Você pode visualizar o status das tarefas do DAG a partir da execução do DAG
Validar a execução do trabalho a partir do portal
Valide o trabalho a partir do "Apache Flink Dashboard"
Código de exemplo
Este é um exemplo de orquestração de pipeline de dados usando o Airflow com o HDInsight no AKS.
O DAG espera ter a configuração do Principal de Serviço para a credencial do Cliente OAuth e passar a configuração de entrada seguinte para a execução:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Referência
- Consulte o código de exemplo .
- Website Apache Flink
- Apache, Apache Airflow, Airflow, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).