Orquestração de trabalho do Apache Flink® usando o Gerenciador de Orquestração de Fluxo de Trabalho do Azure Data Factory (alimentado pelo Apache Airflow)
Importante
O Azure HDInsight no AKS se aposentou em 31 de janeiro de 2025. Saiba mais sobre com este comunicado.
Você precisa migrar suas cargas de trabalho para microsoft fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho.
Importante
Esse recurso está atualmente em versão prévia. Os termos de uso complementares para o Microsoft Azure Previews incluem mais termos legais que se aplicam aos recursos do Azure que estão em versão beta, em versão prévia ou ainda não lançados em disponibilidade geral. Para obter informações sobre essa versão prévia específica, consulte Azure HDInsight em informações de visualização do AKS. Para obter perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações sobre da Comunidade do Azure HDInsight.
Este artigo aborda o gerenciamento de um trabalho Flink usando API REST do Azure e pipeline de dados de orquestração com o Gerenciador de Orquestração de Fluxo de Trabalho do Azure Data Factory. serviço do Gerenciador de Orquestração de Fluxo de Trabalho do Azure Data Factory é uma maneira simples e eficiente de criar e gerenciar ambientes de do Apache Airflow, permitindo que você execute pipelines de dados em escala facilmente.
O Apache Airflow é uma plataforma de software livre que cria, agenda e monitora programaticamente fluxos de trabalho de dados complexos. Ele permite que você defina um conjunto de tarefas, chamadas operadores, que podem ser combinadas em grafos acíclicos direcionados (DAGs) para representar pipelines de dados.
O diagrama a seguir mostra o posicionamento de Airflow, Key Vault e HDInsight no AKS no Azure.
Várias Entidades de Serviço do Azure são criadas com base no escopo para limitar o acesso necessário e gerenciar o ciclo de vida da credencial do cliente de forma independente.
É recomendável alternar chaves de acesso ou segredos periodicamente.
Etapas de instalação
configurar de cluster Flink
Faça o upload do arquivo jar do Flink Job para a conta de armazenamento. Pode ser a conta de armazenamento primária associada ao cluster Flink ou a qualquer outra conta de armazenamento, na qual você deve atribuir ao MSI atribuído ao usuário a função "Proprietário de Dados de Blob de Armazenamento" nesta conta de armazenamento usada para o cluster.
Azure Key Vault – Você pode seguir este tutorial para criar um novo Azure Key Vault caso não tenha um.
Crie principal de serviço do Microsoft Entra para acessar o Key Vault – Conceda permissão para acessar o Azure Key Vault com a função "Key Vault Secrets Officer" e anote 'appId', 'password' e 'tenant' da resposta. Precisamos utilizar o mesmo mecanismo para o Airflow usar o Key Vault como solução de back-end para armazenar informações confidenciais.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Habilite o Azure Key Vault para o Gerenciador de Orquestração de Fluxos de Trabalho a fim de armazenar e gerenciar suas informações confidenciais de maneira segura e centralizada. Ao fazer isso, você pode usar variáveis e conexões e elas são armazenadas automaticamente no Azure Key Vault. Os nomes das conexões e variáveis precisam ser prefixados por variables_prefix definido em AIRFLOW__SECRETS__BACKEND_KWARGS. Por exemplo, se variables_prefix tiver um valor como hdinsight-aks-variables, para uma chave variável de olá, você desejará armazenar sua Variável em hdinsight-aks-variable -hello.
Adicione as seguintes configurações para as substituições de configuração de fluxo de ar em propriedades de runtime integradas:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Adicione a seguinte configuração para a configuração de variáveis de ambiente nas propriedades do runtime integrado do Airflow:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Adicionar requisitos do Airflow - apache-airflow-providers-microsoft-azure
Crie principal de serviço do Microsoft Entra para acessar o Azure – Conceda permissão para acessar o cluster AKS do HDInsight com a função de Colaborador, anote o appId, a senha e o locatário da resposta.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Crie os seguintes segredos no cofre de chaves usando os valores de appId, senha e locatário da entidade de serviço do AD anterior, cada um prefixado pela propriedade variables_prefix, conforme definido em AIRFLOW__SECRETS__BACKEND_KWARGS. O código DAG pode acessar qualquer uma dessas variáveis sem variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
Definição de DAG
Um DAG (Grafo Acíclico Direcionado) é o conceito central do Airflow, coletando tarefas em conjunto, organizadas com dependências e relações para indicar como devem ser executadas.
Há três maneiras de declarar um DAG:
Você pode usar um gerenciador de contexto, que adiciona o DAG a qualquer coisa dentro dele implicitamente
Você pode usar um construtor padrão, passando o DAG para todos os operadores usados
Você pode usar o decorador @dag para transformar uma função em um gerador de DAG (de airflow.decorators import dag)
OS DAGs não são nada sem Tarefas a serem executadas e eles vêm na forma de Operadores, Sensores ou TaskFlow.
Você pode ler mais detalhes sobre DAGs, Fluxo de Controle, SubDAGs, Grupos de Tarefas etc. diretamente de Apache Airflow.
Execução do DAG
O código de exemplo está disponível no git; baixe o código localmente em seu computador e carregue o wordcount.py em um armazenamento de blobs. Siga as etapas de para importar o DAG para o fluxo de trabalho criado durante a instalação.
O wordcount.py é um exemplo de orquestração de um envio de trabalho Flink usando o Apache Airflow com HDInsight no AKS. O DAG tem duas tarefas:
obter
OAuth Token
Invocar a API REST do Azure de Envio de Trabalho Flink do HDInsight para enviar um novo trabalho
O DAG espera estar configurado para o Principal do Serviço, conforme descrito durante o processo de instalação da credencial do Cliente OAuth, e passar a seguinte configuração de entrada para a execução.
Etapas de execução
Execute a DAG na interface do Airflow, você pode abrir a interface do usuário do Gerenciador de Orquestração de Fluxo de Trabalho do Azure Data Factory clicando no ícone de monitoramento.
Selecione o DAG "FlinkWordCountExample" na página "DAGs".
Clique no ícone "executar" no canto superior direito e selecione "Acionar DAG com configuração".
Passe o JSON de configuração necessário
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Clique no botão "Disparar" para iniciar a execução do DAG.
Você pode visualizar o status das tarefas da DAG durante a execução do DAG
Validar a execução do trabalho a partir do portal
Validar o trabalho no "Dashboard do Apache Flink"
Código de exemplo
Este é um exemplo de orquestração de pipeline de dados usando o Airflow com HDInsight no AKS.
O DAG espera configurar o Principal de Serviço para a credencial do cliente OAuth e fornecer a seguinte configuração de entrada para a execução:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Referência
- Consulte o código de exemplo .
- site do Apache Flink
- Apache, Apache Airflow, Airflow, Apache Flink, Flink e nomes de projetos de software livre associados são marcas registradas da Apache Software Foundation (ASF).