Condividi tramite


Definire il monitoraggio personalizzato delle pipeline DLT con agganci di eventi

Importante

Il supporto agli hook degli eventi è in anteprima pubblica.

È possibile usare hook degli eventi per aggiungere funzioni di callback Python personalizzate che vengono eseguite quando gli eventi sono salvati in modo permanente nel log degli eventi di una pipeline DLT . È possibile usare hook di eventi per implementare soluzioni personalizzate di monitoraggio e avvisi. Ad esempio, è possibile usare hook di eventi per inviare messaggi di posta elettronica o scrivere in un log quando si verificano eventi specifici o per l'integrazione con soluzioni di terze parti per monitorare gli eventi della pipeline.

Si definisce un hook di eventi con una funzione Python che accetta un singolo argomento, dove l'argomento è un dizionario che rappresenta un evento. Si includono quindi gli hook di eventi come parte del codice sorgente per una pipeline. Qualsiasi hook di eventi definito in una pipeline tenterà di elaborare tutti gli eventi generati durante ogni aggiornamento della pipeline. Se la pipeline è costituita da più artefatti di codice sorgente, ad esempio più notebook, tutti gli hook di eventi definiti vengono applicati all'intera pipeline. Anche se gli hook di eventi sono inclusi nel codice sorgente per la pipeline, non sono inclusi nel grafico della pipeline.

È possibile utilizzare ganci di eventi con pipeline che pubblicano nel metastore Hive o nel Catalogo Unity.

Nota

  • Python è l'unico linguaggio supportato per la definizione di hook di eventi. Per definire funzioni Python personalizzate che elaborano gli eventi in una pipeline implementata usando l'interfaccia SQL, aggiungere le funzioni personalizzate in un notebook Python separato che viene eseguito come parte della pipeline. Le funzioni Python vengono applicate all'intera pipeline quando questa viene eseguita.
  • Gli hook degli eventi vengono attivati solo per gli eventi in cui il maturity_level è STABLE.
  • Gli hook di eventi vengono eseguiti in modo asincrono dagli aggiornamenti della pipeline, ma in modo sincrono con altri hook di eventi. Ciò significa che viene eseguito solo un singolo hook di eventi alla volta e che gli altri hook di eventi attendono di essere eseguiti fino al completamento dell'hook di eventi attualmente in esecuzione. Se un hook eventi viene eseguito per un periodo illimitato, blocca tutti gli altri hook eventi.
  • DLT tenta di eseguire ogni hook di eventi su ogni evento generato durante un aggiornamento della pipeline. Per garantire che gli hook di eventi ritardatari abbiano tempo per elaborare tutti gli eventi in fila, DLT attende un periodo fisso non configurabile prima di terminare il processo computazionale che esegue la pipeline. Tuttavia, non è garantito che tutti gli hook vengano attivati su tutti gli eventi prima che il calcolo venga terminato.

Monitorare l'elaborazione degli hook degli eventi

Usare il tipo di evento hook_progress nel registro eventi DLT per monitorare lo stato degli hook di eventi di un aggiornamento. Per evitare dipendenze circolari, gli hook degli eventi non vengono attivati per gli eventi di hook_progress.

Definire un hook evento

Per definire un hook di eventi, usare l'on_event_hook decorator:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

Il max_allowable_consecutive_failures descrive il numero massimo di volte consecutive in cui un hook di eventi può avere esito negativo prima che venga disabilitato. Un errore di hook eventi si verifica quando l'hook eventi genera un'eccezione. Se un hook degli eventi è disabilitato, non elabora nuovi eventi fino a quando la pipeline non viene riavviata.

max_allowable_consecutive_failures deve essere un numero intero maggiore o uguale a 0 o None. Un valore di None (assegnato per impostazione predefinita) indica che non esiste alcun limite al numero di errori consecutivi consentiti per l'hook dell'evento e questo non viene mai disattivato.

Gli errori di hook degli eventi e la disabilitazione degli hook degli eventi possono essere monitorati nel registro degli eventi come eventi hook_progress.

La funzione hook dell'evento deve essere una funzione Python che accetta esattamente un parametro, una rappresentazione di un dizionario dell'evento che ha attivato questo hook dell'evento. Qualsiasi valore restituito dalla funzione di hook dell'evento viene ignorato.

Esempio: Selezionare eventi specifici per l'elaborazione

Nell'esempio seguente viene illustrato un hook eventi che seleziona eventi specifici per l'elaborazione. In particolare, questo esempio attende fino a quando non vengono ricevuti gli eventi della pipeline STOPPING e quindi restituisce un messaggio ai log del driver stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Esempio: Inviare tutti gli eventi a un canale Slack

L'esempio seguente implementa un hook eventi che invia tutti gli eventi ricevuti a un canale Slack usando l'API Slack.

Questo esempio usa un segreto di Databricks per archiviare in modo sicuro un token necessario per l'autenticazione all'API Slack.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Esempio: Configurare un gancio per eventi che si disabilita dopo quattro errori consecutivi

Nell'esempio seguente viene illustrato come configurare un hook dell'evento che è disabilitato se fallisce consecutivamente quattro volte.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Esempio: una pipeline DLT con un collegamento eventi

Nell'esempio seguente si illustra l'aggiunta di un gancio evento al codice sorgente per una pipeline. Si tratta di un esempio semplice ma completo sull'uso di ganci per eventi con una pipeline.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })