Definiowanie niestandardowego monitorowania potoków DLT za pomocą haków zdarzeń
Ważny
Obsługa haków zdarzeń jest dostępna w Publicznej Wersji Zapoznawczej.
Można używać haków zdarzeń do dodawania niestandardowych funkcji zwrotnych w języku Python, które są wykonywane, gdy zdarzenia są zapisywane do dziennika zdarzeń potoku DLT . Za pomocą punktów zaczepienia zdarzeń można zaimplementować niestandardowe rozwiązania do monitorowania i zgłaszania alertów. Można na przykład użyć punktów zaczepienia zdarzeń, aby wysyłać wiadomości e-mail lub zapisywać w dzienniku, gdy wystąpią określone zdarzenia lub zintegrować z rozwiązaniami innych firm w celu monitorowania zdarzeń potoku.
Hak zdarzenia definiujesz za pomocą funkcji w Pythonie, która przyjmuje jeden argument, będący słownikiem reprezentującym zdarzenie. Jako część kodu źródłowego potoku, dołączysz następnie haki zdarzeń. Hooki zdarzeń zdefiniowane w potoku będą próbować przetworzyć wszystkie zdarzenia generowane podczas każdej aktualizacji potoku. Jeśli pipeline składa się z wielu artefaktów kodu źródłowego, na przykład wielu notesów, wszystkie zdefiniowane wyzwalacze zdarzeń są stosowane do całego pipeline'u. Mimo że w kodzie źródłowym potoku znajdują się haki zdarzeń, nie są one uwzględnione w grafie potoku.
Można używać uchwytów zdarzeń z potokami, które publikują do metastore Hive lub Unity Catalog.
Notatka
- Język Python jest jedynym językiem obsługiwanym do definiowania hooków zdarzeń. Aby zdefiniować niestandardowe funkcje języka Python, które przetwarzają zdarzenia w potoku zaimplementowanym przy użyciu interfejsu SQL, dodaj funkcje niestandardowe w osobnym notesie języka Python uruchamianym jako część potoku. Funkcje języka Python są stosowane do całego potoku, gdy jest on uruchamiany.
- Haki zdarzeń są wyzwalane tylko w przypadku zdarzeń, w których maturity_level jest
STABLE
. - Haczyki zdarzeń są wykonywane asynchronicznie z aktualizacjami potoku, ale synchronicznie z innymi haczykami zdarzeń. Oznacza to, że tylko jeden punkt zaczepienia zdarzeń jest uruchamiany w danym momencie, a inne haki zdarzeń czekają na uruchomienie do momentu ukończenia aktualnie uruchomionego elementu zaczepienia zdarzeń. Jeśli hak zdarzeń działa w nieskończoność, blokuje wszystkie inne haki zdarzeń.
- DLT próbuje uruchomić każdy hook zdarzenia przy każdym zdarzeniu emitowanym podczas aktualizacji potoku. Aby upewnić się, że opóźnione haki zdarzeń mają czas na przetworzenie wszystkich zdarzeń w kolejce, dlT czeka na niekonfigurowalny stały okres przed zakończeniem obliczeń z uruchomionym potokiem. Nie ma jednak gwarancji, że wszystkie uchwyty są wyzwalane podczas wszystkich zdarzeń przed zakończeniem obliczeń.
Monitorowanie przetwarzania hooków zdarzeń
Użyj typu zdarzenia hook_progress
w dzienniku zdarzeń DLT do monitorowania stanu hooków zdarzeń aktualizacji. Aby zapobiec zależnościom cyklicznym, haki zdarzeń nie są wyzwalane dla zdarzeń hook_progress
.
Haczyk zdarzenia
Aby zdefiniować hook zdarzenia, użyj dekoratora on_event_hook
:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
Maksymalna liczba kolejnych niepowodzeń zaczepu zdarzenia, po których jest on wyłączany, jest opisana w max_allowable_consecutive_failures
. Awaria haka zdarzeń jest definiowana jako sytuacja, gdy hak zdarzeń zgłasza wyjątek. Jeśli hook zdarzeń jest wyłączony, nie przetwarza nowych zdarzeń aż do ponownego uruchomienia potoku.
max_allowable_consecutive_failures
musi być liczbą całkowitą większą lub równą 0
lub None
. Wartość None
(przypisana domyślnie) oznacza, że nie ma limitu liczby kolejnych niepowodzeń, które mogą wystąpić dla haka zdarzeń, a hak zdarzeń nigdy nie jest wyłączony.
Błędy hooków zdarzeń i ich wyłączanie można monitorować w dzienniku zdarzeń jako zdarzenia typu hook_progress
.
Funkcja haka zdarzeń musi być funkcją języka Python, która akceptuje jeden jedyny parametr, czyli słownikową reprezentację zdarzenia, które uruchomiło ten hak zdarzenia. Każda wartość zwracana z funkcji punktu zaczepienia zdarzeń jest ignorowana.
Przykład: Wybieranie określonych zdarzeń do przetwarzania
W poniższym przykładzie pokazano punkt zaczepienia zdarzeń, który wybiera określone zdarzenia do przetwarzania. W szczególności ten przykład czeka na odebranie zdarzeń z potoku STOPPING
, a następnie zapisuje komunikat do rejestrów dziennika sterownika stdout
.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Przykład: wysyłanie wszystkich zdarzeń do kanału usługi Slack
W poniższym przykładzie zaimplementowano hook zdarzeń, który wysyła wszystkie odebrane zdarzenia do kanału Slack, korzystając z API Slacka.
W tym przykładzie użyto tajemnicy Databricks do bezpiecznego przechowywania tokenu wymaganego do uwierzytelnienia w Slack API.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Przykład: Skonfiguruj hak zdarzeń, aby wyłączyć go po czterech kolejnych awariach
W poniższym przykładzie pokazano, jak skonfigurować hak zdarzeń, który jest wyłączany, jeśli czterokrotnie z rzędu dojdzie do niepowodzenia.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Przykład: potok DLT z hakiem zdarzeń
W poniższym przykładzie pokazano dodanie hooka zdarzeń do kodu źródłowego pipeline'u. Jest to prosty, ale kompletny przykład użycia haków zdarzeń w potoku.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})