Definire il monitoraggio personalizzato delle pipeline di Delta Live Tables con hook di eventi
Importante
Il supporto per gli hook di eventi è disponibile in anteprima pubblica.
È possibile usare
Si definisce un hook di eventi con una funzione Python che accetta un singolo argomento, dove l'argomento è un dizionario che rappresenta un evento. Si includono quindi gli hook di eventi come parte del codice sorgente per una pipeline. Qualsiasi hook di eventi definito in una pipeline tenterà di elaborare tutti gli eventi generati durante ogni aggiornamento della pipeline. Se la pipeline è costituita da più artefatti di codice sorgente, ad esempio più notebook, tutti gli hook di eventi definiti vengono applicati all'intera pipeline. Anche se gli hook di eventi sono inclusi nel codice sorgente per la pipeline, non sono inclusi nel grafico della pipeline.
È possibile utilizzare i ganci degli eventi con pipeline che pubblicano sul metastore Hive o su Unity Catalog.
Nota
- Python è l'unico linguaggio supportato per la definizione di hook di eventi. Per definire funzioni Python personalizzate che elaborano gli eventi in una pipeline implementata usando l'interfaccia SQL, aggiungere le funzioni personalizzate in un notebook Python separato che viene eseguito come parte della pipeline. Le funzioni Python vengono applicate all'intera pipeline quando viene eseguita la pipeline.
- Gli hook eventi vengono attivati solo per gli eventi in cui il maturity_level è
STABLE
. - Gli hook di eventi vengono eseguiti in modo asincrono dagli aggiornamenti della pipeline, ma in modo sincrono con altri hook di eventi. Ciò significa che viene eseguito solo un singolo hook di eventi alla volta e altri hook di eventi attendono l'esecuzione fino al completamento dell'hook eventi attualmente in esecuzione. Se un hook eventi viene eseguito per un periodo illimitato, blocca tutti gli altri hook di eventi.
- Delta Live Tables tenta di eseguire ogni hook su ciascun evento emesso durante l'aggiornamento di una pipeline. Per garantire che gli hook di eventi in ritardo abbiano tempo per elaborare tutti gli eventi in coda, Delta Live Tables attende un periodo fisso non configurabile prima di terminare il calcolo relativo all'esecuzione della pipeline. Tuttavia, non è garantito che tutti gli hook vengano attivati su tutti gli eventi prima che il calcolo venga terminato.
Monitorare l'elaborazione degli hook eventi
Usare il tipo di evento hook_progress
nel registro eventi di Tabelle Live Delta per controllare lo stato degli hook di eventi di un aggiornamento. Per evitare dipendenze circolari, gli hook di eventi non vengono attivati per hook_progress
gli eventi.
Definire un hook eventi
Per definire un hook di eventi, usare l'elemento on_event_hook
Decorator:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
max_allowable_consecutive_failures
Descrive il numero massimo di volte consecutive in cui un hook di eventi può avere esito negativo prima che venga disabilitato. Un errore di hook eventi viene definito come qualsiasi volta che l'hook eventi genera un'eccezione. Se un hook eventi è disabilitato, non elabora nuovi eventi fino al riavvio della pipeline.
max_allowable_consecutive_failures
deve essere un numero intero maggiore o uguale a 0
o None
. Un valore di None
(assegnato per impostazione predefinita) indica che non esiste alcun limite al numero di errori consecutivi consentiti per l'hook di evento e l'hook di evento non viene mai disabilitato.
Gli errori di hook degli eventi e la disabilitazione degli hook di eventi possono essere monitorati nel registro eventi come hook_progress
eventi.
La funzione hook eventi deve essere una funzione Python che accetta esattamente un parametro, una rappresentazione del dizionario dell'evento che ha attivato questo hook eventi. Qualsiasi valore restituito dalla funzione hook eventi viene ignorato.
Esempio: Selezionare eventi specifici per l'elaborazione
Nell'esempio seguente viene illustrato un hook eventi che seleziona eventi specifici per l'elaborazione. In particolare, questo esempio attende fino a quando non vengono ricevuti gli eventi della pipeline STOPPING
e quindi restituisce un messaggio al driver registra stdout
.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Esempio: Inviare tutti gli eventi a un canale Slack
L'esempio seguente implementa un hook eventi che invia tutti gli eventi ricevuti a un canale Slack usando l'API Slack.
Questo esempio usa un segreto databricks per archiviare in modo sicuro un token necessario per l'autenticazione all'API Slack.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Esempio: Configurare un hook eventi per disabilitare dopo quattro errori consecutivi
Nell'esempio seguente viene illustrato come configurare un hook eventi disabilitato se ha esito negativo consecutivamente quattro volte.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Esempio: una pipeline Delta Live Tables con un gancio eventi
Nell'esempio seguente viene illustrato l'aggiunta di un hook di eventi al codice sorgente per una pipeline. Si tratta di un esempio semplice ma completo dell'uso di hook di eventi con una pipeline.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})