Ejecución de una canalización de Delta Live Tables en un flujo de trabajo
Puede ejecutar una canalización de Delta Live Tables como parte de un flujo de trabajo de procesamiento de datos con trabajos de Databricks, Apache Airflow o Azure Data Factory.
Trabajos
Puede organizar varias tareas en un trabajo de Databricks para implementar un flujo de trabajo de procesamiento de datos. Si desea incluir una canalización de Delta Live Tables en un trabajo, utilice la tarea Canalización al crear un trabajo. Consulte Tarea de canalizaciones de Delta Live Tables para trabajos.
Airflow de Apache
Apache Airflow es una solución de código abierto para administrar y programar flujos de trabajo de datos. Airflow representa los flujos de trabajo como gráficos acíclicos dirigidos (DAG) de operaciones. El flujo de trabajo se define en un archivo de Python y Airflow administra la programación y la ejecución. Para obtener más información sobre cómo instalar y usar Airflow con Azure Databricks, consulte Orquestación de trabajos de Azure Databricks con Apache Airflow.
Para ejecutar una canalización de Delta Live Tables como parte de un flujo de trabajo de Airflow, utilice DatabricksSubmitRunOperator.
Requisitos
Para que Airflow sea compatible con Delta Live Tables, necesita lo siguiente:
- Airflow, versión 2.1.0 o posterior.
- El paquete del proveedor de Databricks, versión 2.1.0 o posterior.
Ejemplo
En el ejemplo siguiente, se crea un DAG de Airflow que desencadena una actualización para la canalización de Delta Live Tables con el identificador 8279d543-063c-4d63-9926-dae38e35ce8b
:
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('dlt',
start_date=days_ago(2),
schedule_interval="@once",
default_args=default_args
) as dag:
opr_run_now=DatabricksSubmitRunOperator(
task_id='run_now',
databricks_conn_id='CONNECTION_ID',
pipeline_task={"pipeline_id": "8279d543-063c-4d63-9926-dae38e35ce8b"}
)
Reemplace CONNECTION_ID
por el identificador de una conexión de Airflow al área de trabajo.
Guarde este ejemplo en el directorio airflow/dags
y utilice la interfaz de usuario de Airflow para ver y desencadenar el DAG. Utilice la interfaz de usuario de Delta Live Tables para ver los detalles de la actualización de la canalización.
Azure Data Factory
Azure Data Factory es un servicio ETL basado en la nube que le permite organizar flujos de trabajo de integración y transformación de datos. Azure Data Factory admite directamente la ejecución de tareas de Azure Databricks en un flujo de trabajo, incluidos cuadernos, tareas JAR y scripts de Python. También puede incluir una canalización en un flujo de trabajo; para ello, llame a la API de Delta Live Tables desde una actividad web de Azure Data Factory. Por ejemplo, para desencadenar una actualización de canalización desde Azure Data Factory:
Cree una factoría de datos o abra una existente.
Cuando se complete la creación, abra la página de la factoría de datos y haga clic en el icono Abrir Azure Data Factory Studio. Aparecerá la interfaz de usuario de Azure Data Factory.
Para crear una canalización de Azure Data Factory, seleccione Canalización en el menú desplegable Nuevo de la interfaz de usuario de Azure Data Factory Studio.
En el cuadro de herramientas Actividades, expanda General y arrastre la actividad Web al lienzo de la canalización. Haga clic en la pestaña Configuración y escriba los valores siguientes:
Nota:
Como procedimiento recomendado de seguridad, cuando se autentique con herramientas, sistemas, scripts y aplicaciones automatizados, Databricks recomienda usar los tokens de acceso personal pertenecientes a las entidades de servicio en lugar de a los usuarios del área de trabajo. Para crear tókenes para entidades de servicio, consulte Administración de tokens de acceso para una entidad de servicio.
URL:
https://<databricks-instance>/api/2.0/pipelines/<pipeline-id>/updates
.Reemplace
<get-workspace-instance>
.Reemplace
<pipeline-id>
por el identificador de la canalización.Método: seleccione POST en el menú desplegable.
Encabezados: haga clic en + Nuevo. En el cuadro de texto Nombre, escriba
Authorization
. En el cuadro de texto Valor, escribaBearer <personal-access-token>
.Reemplace
<personal-access-token>
por un token de acceso personal de Azure Databricks.Cuerpo: para pasar parámetros adicionales de solicitud, especifique un documento JSON que contenga los parámetros. Por ejemplo, para iniciar una actualización y volver a procesar todos los datos de la canalización:
{"full_refresh": "true"}
. Si no hay otros parámetros de solicitud, escriba llaves vacías ({}
).
Para probar la actividad web, haga clic en Depurar en la barra de herramientas de la canalización en la interfaz de usuario de Data Factory. La salida y el estado de la ejecución (incluidos los errores) se muestran en la pestaña Salida de la canalización de Azure Data Factory. Utilice la interfaz de usuario de Delta Live Tables para ver los detalles de la actualización de la canalización.
Sugerencia
Un requisito de flujo de trabajo común es iniciar una tarea después de completar una tarea anterior. Como la solicitud updates
de Delta Live Tables es asincrónica (es decir, la solicitud se devuelve después de iniciar la actualización, pero antes de que esta se complete), las tareas de la canalización de Azure Data Factory que dependan de la actualización de Delta Live Tables deben esperar a que se complete la actualización. Una opción para esperar a que se complete la actualización es agregar una actividad Until después de la actividad web que desencadene la actualización de Delta Live Tables. En la actividad Until:
- Agregue una actividad Wait para esperar un número configurado de segundos a que se complete la actualización.
- Agregue una actividad web después de la actividad Wait que utilice la solicitud Obtención de detalles de actualización de Delta Live Tables para obtener el estado de la actualización. El campo
state
de la respuesta devuelve el estado actual de la actualización, incluido si se completó. - Utilice el valor del campo
state
para establecer la condición de finalización de la actividad Until. También puede utilizar una actividad Establecer variable para agregar una variable de canalización en función del valorstate
y utilizar esta variable para la condición de finalización.