Orquestração de tarefas do Apache Flink® usando o Azure Data Factory Workflow Orchestration Manager (com tecnologia Apache Airflow)
Nota
Vamos desativar o Azure HDInsight no AKS em 31 de janeiro de 2025. Antes de 31 de janeiro de 2025, você precisará migrar suas cargas de trabalho para o Microsoft Fabric ou um produto equivalente do Azure para evitar o encerramento abrupto de suas cargas de trabalho. Os clusters restantes na sua subscrição serão interrompidos e removidos do anfitrião.
Apenas o apoio básico estará disponível até à data da reforma.
Importante
Esta funcionalidade está atualmente em pré-visualização. Os Termos de Utilização Suplementares para Pré-visualizações do Microsoft Azure incluem mais termos legais que se aplicam a funcionalidades do Azure que estão em versão beta, em pré-visualização ou ainda não disponibilizadas para disponibilidade geral. Para obter informações sobre essa visualização específica, consulte Informações de visualização do Azure HDInsight no AKS. Para perguntas ou sugestões de recursos, envie uma solicitação no AskHDInsight com os detalhes e siga-nos para obter mais atualizações na Comunidade do Azure HDInsight.
Este artigo aborda o gerenciamento de um trabalho Flink usando a API REST do Azure e o pipeline de dados de orquestração com o Gerenciador de Orquestração de Fluxo de Trabalho do Azure Data Factory. O serviço Azure Data Factory Workflow Orchestration Manager é uma maneira simples e eficiente de criar e gerenciar ambientes Apache Airflow , permitindo que você execute pipelines de dados em escala facilmente.
O Apache Airflow é uma plataforma de código aberto que cria, agenda e monitora programaticamente fluxos de trabalho de dados complexos. Ele permite definir um conjunto de tarefas, chamadas operadores, que podem ser combinadas em gráficos acíclicos direcionados (DAGs) para representar pipelines de dados.
O diagrama a seguir mostra o posicionamento do Airflow, Key Vault e HDInsight no AKS no Azure.
Várias Entidades de Serviço do Azure são criadas com base no escopo para limitar o acesso necessário e gerenciar o ciclo de vida da credencial do cliente de forma independente.
Recomenda-se girar as chaves de acesso ou segredos periodicamente.
Passos de configuração
Carregue seu frasco Flink Job para a conta de armazenamento. Pode ser a conta de armazenamento principal associada ao cluster Flink ou qualquer outra conta de armazenamento, onde você deve atribuir a função "Storage Blob Data Owner" ao MSI atribuído pelo usuário usado para o cluster nessa conta de armazenamento.
Azure Key Vault - Você pode seguir este tutorial para criar um novo Azure Key Vault no caso, se você não tiver um.
Criar entidade de serviço do Microsoft Entra para acessar o Cofre da Chave – Conceda permissão para acessar o Cofre da Chave do Azure com a função "Responsável pelos Segredos do Cofre da Chave" e anote 'appId', 'senha' e 'locatário' na resposta. Precisamos usar o mesmo para o Airflow para usar o armazenamento do Key Vault como back-ends para armazenar informações confidenciais.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Habilite o Azure Key Vault for Workflow Orchestration Manager para armazenar e gerenciar suas informações confidenciais de forma segura e centralizada. Ao fazer isso, você pode usar variáveis e conexões, e elas serão armazenadas automaticamente no Cofre da Chave do Azure. O nome das conexões e variáveis precisa ser prefixado por variables_prefix definido em AIRFLOW__SECRETS__BACKEND_KWARGS. Por exemplo, se variables_prefix tiver um valor como hdinsight-aks-variables, então para uma chave variável de hello, convém armazenar sua variável em hdinsight-aks-variable -hello.
Adicione as seguintes configurações para as substituições de configuração de fluxo de ar nas propriedades de tempo de execução integradas:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Adicione a seguinte configuração para a configuração de variáveis de ambiente nas propriedades de tempo de execução integrado do Airflow:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Adicionar requisitos de fluxo de ar - apache-airflow-providers-microsoft-azure
Criar entidade de serviço do Microsoft Entra para acessar o Azure – Conceda permissão para acessar o Cluster AKS do HDInsight com a função de Colaborador, anote appId, senha e locatário na resposta.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Crie os seguintes segredos no seu cofre de chaves com o valor do appId, palavra-passe e inquilino da entidade principal do Serviço AD anterior, prefixado por variables_prefix de propriedade definido em AIRFLOW__SECRETS__BACKEND_KWARGS. O código DAG pode acessar qualquer uma dessas variáveis sem variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
Definição de DAG
Um DAG (Directed Acyclic Graph) é o conceito central do fluxo de ar, coletando tarefas juntas, organizadas com dependências e relacionamentos para dizer como elas devem ser executadas.
Há três maneiras de declarar um DAG:
Você pode usar um gerenciador de contexto, que adiciona o DAG a qualquer coisa dentro dele implicitamente
Você pode usar um construtor padrão, passando o DAG para qualquer operador que você usar
Você pode usar o decorador @dag para transformar uma função em um gerador DAG (de airflow.decorators import dag)
DAGs não são nada sem Tarefas para executar, e eles vêm na forma de Operadores, Sensores ou TaskFlow.
Você pode ler mais detalhes sobre DAGs, Control Flow, SubDAGs, TaskGroups, etc. diretamente do Apache Airflow.
Execução do DAG
O código de exemplo está disponível no git, baixe o código localmente no seu computador e carregue o wordcount.py para um armazenamento de blob. Siga as etapas para importar o DAG para seu fluxo de trabalho criado durante a instalação.
O wordcount.py é um exemplo de orquestração de um envio de trabalho Flink usando Apache Airflow com HDInsight no AKS. O DAG tem duas tarefas:
Obter
OAuth Token
Invoque o Envio de Trabalho Flink do HDInsight API REST do Azure para enviar um novo trabalho
O DAG espera ter a configuração para a entidade de serviço, conforme descrito durante o processo de instalação para a credencial do cliente OAuth e passar a seguinte configuração de entrada para a execução.
Etapas de execução
Execute o DAG a partir da IU do Airflow, pode abrir a IU do Azure Data Factory Workflow Orchestration Manager clicando no ícone Monitor.
Selecione o DAG "FlinkWordCountExample" na página "DAGs".
Clique no ícone "executar" no canto superior direito e selecione "Trigger DAG w/ config".
Passar a configuração necessária JSON
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Clique no botão "Gatilho", ele inicia a execução do DAG.
Você pode visualizar o status das tarefas do DAG a partir da execução do DAG
Validar a execução do trabalho a partir do portal
Valide o trabalho a partir do "Apache Flink Dashboard"
Código de exemplo
Este é um exemplo de orquestração de pipeline de dados usando o fluxo de ar com o HDInsight no AKS.
O DAG espera ter a configuração da Entidade de Serviço para a credencial do Cliente OAuth e passar a seguinte configuração de entrada para a execução:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Referência
- Consulte o código de exemplo.
- Site do Apache Flink
- Apache, Apache Airflow, Airflow, Apache Flink, Flink e nomes de projetos de código aberto associados são marcas comerciais da Apache Software Foundation (ASF).