Compartir a través de


Orquestación de trabajos de Azure Databricks con Apache Airflow

En este artículo se describe la compatibilidad de Apache Airflow para orquestar canalizaciones de datos con Azure Databricks, tiene instrucciones para instalar y configurar Airflow localmente y proporciona un ejemplo de implementación y ejecución de un flujo de trabajo de Azure Databricks con Airflow.

Orquestación de trabajos en una canalización de datos

El desarrollo y la implementación de una canalización de procesamiento de datos a menudo requiere la administración de dependencias complejas entre tareas. Por ejemplo, una canalización podría leer datos de un origen, limpiar los datos, transformar los datos limpios y escribir los datos transformados en un destino. También necesita compatibilidad con las pruebas, la programación y la solución de errores al poner en marcha una canalización.

Los sistemas de flujo de trabajo abordan estos desafíos al permitirle definir dependencias entre tareas, programar la ejecución de las canalizaciones y supervisar los flujos de trabajo. Apache Airflow es una solución de código abierto para administrar y programar canalizaciones de datos. Airflow representa las canalizaciones de datos como gráficos acíclicos dirigidos (DAG) de operaciones. El flujo de trabajo se define en un archivo de Python y Airflow administra la programación y la ejecución. La conexión de Azure Databricks y Airflow le permite aprovechar el motor de Spark optimizado que ofrece Azure Databricks con las características de programación de Airflow.

Requisitos

  • La integración entre Airflow y Azure Databricks requiere la versión 2.5.0 y posteriores de Airflow. Los ejemplos de este artículo se prueban con la versión 2.6.1 de Airflow.
  • Airflow requiere Python 3.8, 3.9, 3.10 o 3.11. Los ejemplos de este artículo se prueban con Python 3.8.
  • Las instrucciones de este artículo para instalar y ejecutar Airflow requieren pipenv para crear un entorno virtual de Python.

Operadores de Airflow para Databricks

Un DAG de Airflow se compone de tareas, donde cada tarea ejecuta un operador de Airflow. Los operadores de Airflow que admiten la integración con Databricks se implementan en el proveedor de Databricks.

El proveedor de Databricks incluye operadores para ejecutar varias tareas en un área de trabajo de Azure Databricks, incluida la importación de datos en una tabla, la ejecución de consultas SQL y el trabajo con carpetas de Databricks.

El proveedor de Databricks implementa dos operadores para desencadenar trabajos:

Para crear un nuevo trabajo de Azure Databricks o restablecer un trabajo existente, el proveedor de Databricks implementa DatabricksCreateJobsOperator. DatabricksCreateJobsOperator usa las solicitudes de API POST /api/2.1/jobs/create y POST /api/2.1/jobs/reset. Puede usar DatabricksCreateJobsOperator con DatabricksRunNowOperator para crear y ejecutar un trabajo.

Nota:

El uso de los operadores de Databricks para desencadenar un trabajo requiere proporcionar credenciales en la configuración de conexión de Databricks. Consulte Creación de un token de acceso personal de Azure Databricks para Airflow.

El operador Airflow de Databricks escribe la dirección URL de la página de ejecución del trabajo en los registros de Airflow cada polling_period_seconds (el valor predeterminado es 30 segundos). Para más información, consulte la página del paquete apache-airflow-providers-databricks en el sitio web de Airflow.

Instalación local de la integración de Azure Databricks y Airflow

Para instalar Airflow y el proveedor de Databricks localmente para pruebas y desarrollo, siga estos pasos. Para ver otras opciones de instalación de Airflow, incluida la creación de una instalación de producción, consulte instalación en la documentación de Airflow.

Abra un terminal y ejecute los comandos siguientes:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Reemplace <firstname>, <lastname> y <email> por su nombre de usuario y correo electrónico. Se le pedirá que escriba una contraseña para el usuario administrador. Asegúrese de guardar esta contraseña porque es necesaria para iniciar sesión en la interfaz de usuario de Airflow.

Este script realiza los pasos siguientes:

  1. Crea un directorio denominado airflow y cambia a ese directorio.
  2. Usa pipenv para crear y generar un entorno virtual de Python. Databricks recomienda usar un entorno virtual de Python para aislar las versiones de paquetes y las dependencias de código en ese entorno. Este aislamiento ayuda a reducir las discrepancias inesperadas en las versiones de los paquetes y las colisiones en las dependencias de código.
  3. Inicializa una variable de entorno llamada AIRFLOW_HOME establecida en la ruta de acceso del directorio airflow.
  4. Instala Airflow y los paquetes del proveedor de Airflow Databricks.
  5. Crea un directorio airflow/dags. Airflow usa el directorio dags para almacenar definiciones de DAG.
  6. Inicializa una base de datos SQLite que Airflow usa para realizar un seguimiento de los metadatos. En una implementación de Airflow de producción, configuraría Airflow con una base de datos estándar. La base de datos SQLite y la configuración predeterminada de la implementación de Airflow se inicializan en el directorio airflow.
  7. Crea un usuario administrador para Airflow.

Sugerencia

Para confirmar la instalación del proveedor de Databricks, ejecute el siguiente comando en el directorio de instalación de Airflow:

airflow providers list

Inicio del programador y el servidor web de Airflow

El servidor web de Airflow es necesario para ver la interfaz de usuario de Airflow. Para iniciar el servidor web, abra un terminal en el directorio de instalación de Airflow y ejecute los siguientes comandos:

Nota:

Si el servidor web de Airflow no se inicia debido a un conflicto de puertos, puede cambiar el puerto predeterminado en la configuración de Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

El programador es el componente de Airflow que programa los DAG. Para iniciar el programador, abra un nuevo terminal en el directorio de instalación de Airflow y ejecute los siguientes comandos:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Prueba de la instalación de Airflow

Para comprobar la instalación de Airflow, puede ejecutar uno de los DAG de ejemplo incluidos con Airflow:

  1. En una ventana del explorador, abra http://localhost:8080/home. Inicie sesión en la interfaz de usuario de Airflow con el nombre de usuario y la contraseña que creó al instalar Airflow. Aparece la página DAG de Airflow.
  2. Haga clic en el botón de conmutación Pause/Unpause DAG (Pausar/Dejar de pausar DAG) para dejar de pausar uno de los DAG de ejemplo, por ejemplo, example_python_operator.
  3. Para desencadenar el DAG de ejemplo, haga clic en el botón Desencadenar DAG.
  4. Haga clic en el nombre del DAG para ver los detalles, como el estado de ejecución.

Creación de un token de acceso personal de Azure Databricks para Airflow

Airflow se conecta a Databricks mediante un token de acceso personal (PAT) de Azure Databricks. Para crear un PAT, siga los pasos descritos en Tokens de acceso personal de Azure Databricks para los usuarios del área de trabajo.

Nota:

Como procedimiento recomendado de seguridad, cuando se autentique con herramientas, sistemas, scripts y aplicaciones automatizados, Databricks recomienda usar los tokens de acceso personal pertenecientes a las entidades de servicio en lugar de a los usuarios del área de trabajo. Para crear tókenes para entidades de servicio, consulte Administración de tokens de acceso para una entidad de servicio.

También puede autenticarse en Azure Databricks mediante un token de Microsoft Entra ID. Consulte Conexión de Databricks en la documentación de Airflow.

Configuración de una conexión de Azure Databricks

La instalación de Airflow contiene una conexión predeterminada para Azure Databricks. Para actualizar la conexión para conectarse al área de trabajo mediante el token de acceso personal que creó anteriormente:

  1. En una ventana del explorador, abra http://localhost:8080/connection/list/. Si se le pide que inicie sesión, escriba el nombre de usuario y la contraseña del administrador.
  2. En Conn ID (Id. de conexión), busque databricks_default y haga clic en el botón Edit record (Editar registro).
  3. Reemplace el valor del campo Host por el nombre de la instancia del área de trabajo de la implementación de Azure Databricks, por ejemplo, https://adb-123456789.cloud.databricks.com.
  4. En el campo Contraseña, escriba el token de acceso personal de Azure Databricks.
  5. Haga clic en Save(Guardar).

Si usa un token de Microsoft Entra ID, consulte Conexión de Databricks en la documentación de Airflow para obtener información sobre cómo configurar la autenticación.

Ejemplo: Creación de un DAG de Airflow para ejecutar un trabajo de Azure Databricks

En el ejemplo siguiente se muestra cómo crear una implementación sencilla de Airflow que se ejecuta en la máquina local y que implementa un DAG de ejemplo para desencadenar ejecuciones en Azure Databricks. En este ejemplo, hará lo siguiente:

  1. Crear un cuaderno y agregar código para imprimir una felicitación en función de un parámetro configurado.
  2. Crear un trabajo de Azure Databricks con una sola tarea que ejecute el cuaderno.
  3. Configurar una conexión de Airflow al área de trabajo de Azure Databricks.
  4. Crear un DAG de Airflow para desencadenar el trabajo del cuaderno. El DAG se define en un script de Python mediante DatabricksRunNowOperator.
  5. Usar la interfaz de usuario de Airflow para desencadenar el DAG y ver el estado de ejecución.

Creación de un cuaderno

En este ejemplo se usa un cuaderno que contiene dos celdas:

  • La primera contiene un widget de texto de utilidades de Databricks que define una variable llamada greeting establecida en el valor predeterminado world.
  • La segunda imprime el valor de la variable greeting precedida por hello.

Para crear el cuaderno:

  1. Vaya al área de trabajo de Azure Databricks, haga clic en Icono Nuevo Nuevo en la barra lateral y seleccione Notebook.

  2. Asigne un nombre al cuaderno, como Hello Airflow, y asegúrese de que el lenguaje predeterminado está establecido en Python.

  3. Copie el código de Python siguiente y péguelo en la primera celda del cuaderno.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Agregue una nueva celda debajo de la primera y copie y pegue el siguiente código de Python en la nueva celda:

    print("hello {}".format(greeting))
    

Creación de un trabajo

  1. Haga clic en Icono de flujos de trabajo Flujos de trabajo en la barra lateral.

  2. Haga clic en el botón Crear trabajo.

    Aparece la pestaña Tareas con el cuadro de diálogo Crear tarea.

    Crear diálogo de primera tarea

  3. Reemplace Add a name for your job… (Agregar un nombre para el trabajo…) por el nombre del trabajo.

  4. En el campo Task name (Nombre de tarea), escriba un nombre para la tarea, por ejemplo, greeting-task.

  5. En el menú desplegable Tipo, seleccione Notebook.

  6. En el menú desplegable Origen, seleccione Área de trabajo.

  7. Haga clic en el cuadro de texto Ruta de acceso y use el explorador de archivos para buscar el cuaderno que creó, haga clic en el nombre del cuaderno y haga clic en Confirmar.

  8. Haga clic en Add (Agregar) en Parameters (Parámetros). En el campo Key (Clave), escriba greeting. En el campo Value (Valor), escriba Airflow user.

  9. Haga clic en Create task (Crear tarea).

En el panel Detalles del trabajo, copie el valor del Id. de trabajo. Este valor es necesario para desencadenar el trabajo desde Airflow.

Ejecutar el trabajo

Para probar el nuevo trabajo en la interfaz de usuario de trabajos de Azure Databricks, haga clic en Botón Ejecutar ahora en la esquina superior derecha. Cuando se complete la ejecución, puede comprobar la salida viendo los detalles de la ejecución del trabajo.

Creación de un DAG de Airflow

Defina un DAG de Airflow en un archivo de Python. Para crear un DAG para desencadenar el trabajo de cuaderno de ejemplo:

  1. En un editor de texto o IDE, cree un nuevo archivo llamado databricks_dag.py con el siguiente contenido:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Reemplace JOB_ID por el valor del identificador de trabajo guardado anteriormente.

  2. Guarde el archivo en el directorio airflow/dags. Airflow lee e instala automáticamente los archivos DAG almacenados en airflow/dags/.

Instalación y comprobación del DAG en Airflow

Para desencadenar y comprobar el DAG en la interfaz de usuario de Airflow:

  1. En una ventana del explorador, abra http://localhost:8080/home. Aparece la pantalla DAGs (DAG) de Airflow.
  2. Busque databricks_dag y haga clic en Pause/Unpause DAG (Pausar/Dejar de pausar DAG) para dejar de pausar el DAG.
  3. Para desencadenar el DAG, haga clic en el botón Desencadenar DAG.
  4. Haga clic en una ejecución en la columna Runs (Ejecuciones) para ver el estado y los detalles de la ejecución.