Orquestación de trabajos de Apache Flink® mediante el administrador de orquestación de flujo de trabajo de Azure Data Factory (con tecnología de Apache Airflow)
Nota:
Retiraremos Azure HDInsight en AKS el 31 de enero de 2025. Antes del 31 de enero de 2025, deberá migrar las cargas de trabajo a Microsoft Fabric o un producto equivalente de Azure para evitar la terminación repentina de las cargas de trabajo. Los clústeres restantes de la suscripción se detendrán y quitarán del host.
Solo el soporte técnico básico estará disponible hasta la fecha de retirada.
Importante
Esta funcionalidad actualmente está en su versión preliminar. En Términos de uso complementarios para las versiones preliminares de Microsoft Azure encontrará más términos legales que se aplican a las características de Azure que están en versión beta, en versión preliminar, o que todavía no se han lanzado con disponibilidad general. Para más información sobre esta versión preliminar específica, consulte la Información de Azure HDInsight sobre la versión preliminar de AKS. Para plantear preguntas o sugerencias sobre la característica, envíe una solicitud en AskHDInsight con los detalles y síganos en la comunidad de Azure HDInsight para obtener más actualizaciones.
En este artículo se describe la administración de un trabajo de Flink mediante la API REST de Azure y la canalización de datos del administrador de orquestación de flujos de trabajo de Azure Data Factory. El servicio del administrador de orquestación de flujos de trabajo de Azure Data Factory es una manera sencilla y eficaz de crear y administrar entornos de Apache Airflow, lo que le permite ejecutar canalizaciones de datos a escala con facilidad.
Apache Airflow es una plataforma de código abierto que crea, programa y supervisa flujos de trabajo de datos complejos mediante programación. Permite definir un conjunto de tareas, llamadas operadores, que se pueden combinar en grafos acíclicos dirigidos (DAG) para representar canalizaciones de datos.
En el diagrama siguiente se muestra la ubicación de Airflow, Key Vault y HDInsight en AKS en Azure.
Se crean varias entidades de servicio de Azure en función del ámbito para limitar el acceso que necesita y administrar el ciclo de vida de credenciales de cliente de forma independiente.
Se recomienda rotar las claves de acceso o los secretos periódicamente.
Configurar pasos
Cargar el archivo jar de trabajo de Flink en la cuenta de almacenamiento. Puede ser la cuenta de almacenamiento principal asociada al clúster de Flink o a cualquier otra cuenta de almacenamiento, donde asigne debe asignar el rol "Propietario de datos de blobs de almacenamiento" al MSI asignado por el usuario que se usa para el clúster a esta cuenta de almacenamiento.
Azure Key Vault: puede seguir este tutorial para crear un nuevo Azure Key Vault en caso de que no tenga uno.
Crear entidad de servicio de Microsoft Entra para acceder a Key Vault – Conceder permiso para acceder a Azure Key Vault con el rol “Responsable de secretos de Key Vault” y tomar nota de 'appId' 'contraseña', y 'inquilino' de la respuesta. Es necesario usar lo mismo para Que Airflow use el almacenamiento de Key Vault como backend para almacenar información confidencial.
az ad sp create-for-rbac -n <sp name> --role “Key Vault Secrets Officer” --scopes <key vault Resource ID>
Habilite Azure Key Vault para el Administrador de orquestación de flujo de trabajo para almacenar y administrar la información confidencial de forma segura y centralizada. Al hacerlo, puede usar variables y conexiones, y se almacenarán automáticamente en Azure Key Vault. El nombre de las conexiones y variables debe tener el prefijo variables_prefix definido en AIRFLOW__SECRETS__BACKEND_KWARGS. Por ejemplo, si variables_prefix tiene un valor como hdinsight-aks-variables, para una clave de variable de hello, querrá almacenar la variable en hdinsight-aks-variable -hello.
Agregue las siguientes opciones para las invalidaciones de configuración de Airflow en las propiedades del entorno de ejecución integrado:
AIRFLOW__SECRETS__BACKEND:
"airflow.providers.microsoft.azure.secrets.key_vault.AzureKeyVaultBackend"
AIRFLOW__SECRETS__BACKEND_KWARGS:
"{"connections_prefix": "airflow-connections", "variables_prefix": "hdinsight-aks-variables", "vault_url": <your keyvault uri>}”
Agregue la siguiente configuración para la configuración de variables de entorno en las propiedades del entorno de ejecución integrado Airflow:
AZURE_CLIENT_ID =
<App Id from Create Azure AD Service Principal>
AZURE_TENANT_ID =
<Tenant from Create Azure AD Service Principal>
AZURE_CLIENT_SECRET =
<Password from Create Azure AD Service Principal>
Adición de requisitos de Airflow: apache-airflow-providers-microsoft-azure
Cree entidad de servicio de Microsoft Entra para acceder a Azure – Grant para acceder al clúster de AKS de HDInsight con el rol Colaborador, anote appId, contraseña e inquilino de la respuesta.
az ad sp create-for-rbac -n <sp name> --role Contributor --scopes <Flink Cluster Resource ID>
Cree los siguientes secretos en el almacén de claves con el valor de appId, contraseña e inquilino de la entidad de servicio de AD anterior, con el prefijo de propiedad variables_prefix definido en AIRFLOW__SECRETS__BACKEND_KWARGS. El código DAG puede acceder a cualquiera de estas variables sin variables_prefix.
hdinsight-aks-variables-api-client-id=
<App ID from previous step>
hdinsight-aks-variables-api-secret=
<Password from previous step>
hdinsight-aks-variables-tenant-id=
<Tenant from previous step>
from airflow.models import Variable def retrieve_variable_from_akv(): variable_value = Variable.get("client-id") print(variable_value)
Definición de DAG
Un DAG (Gráfico Acíclico dirigido) es el concepto principal de Airflow, la recopilación de tareas conjuntamente, organizadas con dependencias y relaciones para decir cómo se deben ejecutar.
Hay tres maneras de declarar un DAG:
Puede usar un administrador de contexto, que agrega el DAG a cualquier elemento dentro de él implícitamente
Puede usar un constructor estándar, pasando el DAG a cualquier operador que use
Puede usar el decorador de @dag para convertir una función en un generador DAG (desde airflow.decorators importar dag)
Los DAG no son nada sin que se ejecuten las tareas, y estos vienen en forma de operadores, sensores o TaskFlow.
Puede leer más detalles sobre DAG, Flujo de control, SubDAG, TaskGroups, etc. directamente desde Apache Airflow.
Ejecución de DAG
El código de ejemplo está disponible en el git; descargue el código localmente en el equipo y cargue el wordcount.py en un almacenamiento de blobs. Siga los pasos para importar DAG en el flujo de trabajo creado durante la instalación.
El wordcount.py es un ejemplo de orquestación de un envío de trabajo de Flink mediante Apache Airflow con HDInsight en AKS. El DAG tiene dos tareas:
get
OAuth Token
Invocación de la API REST de Azure de envío de trabajos de Flink de HDInsight para enviar un nuevo trabajo
DaG espera tener la configuración de la entidad de servicio, como se describe durante el proceso de configuración de la credencial de cliente de OAuth y pasar la siguiente configuración de entrada para la ejecución.
Pasos de ejecución
Ejecute el DAG desde la interfaz de usuario de Airflow; para abrir la interfaz de usuario del Administrador de orquestación de flujo de trabajo de Azure Data Factory, haga clic en el icono Supervisar.
Seleccione el “DAG FlinkWordCountExample” en la “página DAG” .
Haga clic en el icono “ejecutar” de la esquina superior derecha y seleccione “Desencadenar DAG con configuración”.
Paso de JSON de configuración necesario
{ "jarName":"WordCount.jar", "jarDirectory":"abfs://filesystem@<storageaccount>.dfs.core.windows.net", "subscription":"<cluster subscription id>", "rg":"<cluster resource group>", "poolNm":"<cluster pool name>", "clusterNm":"<cluster name>" }
Haga clic en el botón “Desencadenar”, inicia la ejecución del DAG.
Puede visualizar el estado de las tareas de DAG desde la ejecución de DAG
Validación de la ejecución del trabajo desde el portal
Validación del trabajo desde el panel de “Apache Flink”
Ejemplo de código
Este es un ejemplo de orquestación de canalización de datos mediante Airflow con HDInsight en AKS.
DaG espera tener la configuración de la entidad de servicio para la credencial del cliente de OAuth y pasar la siguiente configuración de entrada para la ejecución:
{
'jarName':'WordCount.jar',
'jarDirectory':'abfs://filesystem@<storageaccount>.dfs.core.windows.net',
'subscription':'<cluster subscription id>',
'rg':'<cluster resource group>',
'poolNm':'<cluster pool name>',
'clusterNm':'<cluster name>'
}
Referencia
- Consulte el código de ejemplo.
- Sitio web de Apache Flink
- Apache, Apache Airflow, Airflow, Apache Flink, Flink y los nombres de proyecto de código abierto asociados son marcas comerciales de laApache Software Foundation (ASF).