Compartir a través de


Definición de la supervisión personalizada de canalizaciones de Delta Live Tables con enlaces de eventos

Importante

La compatibilidad con enlaces de eventos está en versión preliminar pública.

Puede utilizar enlaces de eventos para agregar funciones de devolución de llamada de Python personalizadas que se ejecutan cuando los eventos persisten en el registro de eventos de una canalización de Delta Live Tables. Puede utilizar enlaces de eventos para implementar soluciones personalizadas de supervisión y alertas. Por ejemplo, puede usar enlaces de eventos para enviar correos electrónicos o escribir en un registro cuando se produzcan eventos específicos o para integrarlos con soluciones de terceros para supervisar eventos de canalización.

Se define un enlace de eventos con una función de Python que acepta un único argumento, donde el argumento es un diccionario que representa un evento. A continuación, se incluyen los enlaces de eventos como parte del código fuente de una canalización. Los enlaces de eventos definidos en una canalización intentarán procesar todos los eventos generados durante cada actualización de canalización. Si la canalización se compone de varios artefactos de código fuente, por ejemplo, varios cuadernos, los enlaces de eventos definidos se aplican a toda la canalización. Aunque los enlaces de eventos se incluyen en el código fuente de la canalización, no se incluyen en el gráfico de canalización.

Puede usar enlaces de eventos con canalizaciones que publiquen en el metastore de Hive o en el catálogo de Unity.

Nota:

  • Python es el único lenguaje admitido para definir enlaces de eventos. Para definir funciones personalizadas de Python que procesan eventos en una canalización implementada mediante la interfaz SQL, agregue las funciones personalizadas en un cuaderno de Python independiente que se ejecute como parte de la canalización. Las funciones de Python se aplican a toda la canalización cuando se ejecuta la canalización.
  • Los enlaces de eventos solo se desencadenan para eventos en los que el valor maturity_level sea STABLE.
  • Los enlaces de eventos se ejecutan de forma asincrónica desde actualizaciones de canalización, pero sincrónicamente con otros enlaces de eventos. Esto significa que solo se ejecuta un enlace de evento a la vez, y otros enlaces de eventos esperan para ejecutarse hasta que se completa el enlace de evento que se está ejecutando actualmente. Si un enlace de eventos se ejecuta indefinidamente, bloquea todos los demás enlaces de eventos.
  • Delta Live Tables intenta ejecutar cada enlace de eventos en cada evento emitido durante una actualización de canalización. Para ayudar a garantizar que los enlaces de eventos retrasados tengan tiempo para procesar todos los eventos en cola, Delta Live Tables espera un período fijo no configurable antes de finalizar el proceso que ejecuta la canalización. Sin embargo, no se garantiza que todos los enlaces se desencadenen en todos los eventos antes de que finalice el proceso.

Supervisión del procesamiento de enlaces de eventos

Use el tipo de evento hook_progress en el registro de eventos de Delta Live Tables para supervisar el estado de los enlaces de eventos de una actualización. Para evitar dependencias circulares, los enlaces de eventos no se desencadenan para eventos hook_progress.

Definición de un enlace de eventos

Para definir un enlace de eventos, use el decorador on_event_hook:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

El elemento max_allowable_consecutive_failures describe el número máximo de veces consecutivas que un enlace de eventos puede producir un error antes de que se deshabilite. Una error en el enlace de eventos se define como cualquier momento en que el enlace de eventos genera una excepción. Si un enlace de eventos está deshabilitado, no procesa nuevos eventos hasta que se reinicia la canalización.

max_allowable_consecutive_failures debe ser un entero superior o igual a 0 o None. Un valor de None (asignado de forma predeterminada) significa que no hay ningún límite para el número de errores consecutivos permitidos para el enlace de eventos y el enlace de eventos nunca está deshabilitado.

Los errores de enlace de eventos y la deshabilitación de enlaces de eventos se pueden supervisar en el registro de eventos como eventos hook_progress.

La función de enlace de eventos debe ser una función de Python que acepte exactamente un parámetro, una representación de diccionario del evento que desencadenó este enlace de eventos. Se omite cualquier valor devuelto de la función de enlace de eventos.

Ejemplo: Selección de eventos específicos para su procesamiento

En el ejemplo siguiente se muestra un enlace de eventos que selecciona eventos específicos para su procesamiento. En concreto, en este ejemplo se espera hasta que se reciben eventos de la canalización STOPPING y, a continuación, se envía un mensaje a los registros del controlador stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Ejemplo: Envío de todos los eventos a un canal de Slack

En el ejemplo siguiente se implementa un enlace de eventos que envía todos los eventos recibidos a un canal de Slack mediante la API de Slack.

En este ejemplo se usa un secreto de Databricks para almacenar de forma segura un token necesario para autenticarse en la API de Slack.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Ejemplo: Configuración de un enlace de eventos para deshabilitar después de cuatro errores consecutivos

En el ejemplo siguiente se muestra cómo configurar un enlace de eventos que está deshabilitado si se produce un error consecutivamente cuatro veces.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Ejemplo: Una canalización de Delta Live Tables con un enlace de eventos

En el ejemplo siguiente se muestra cómo agregar un enlace de eventos al código fuente de una canalización. Este es un ejemplo sencillo, pero completo, de uso de enlaces de eventos con una canalización.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })