Udostępnij za pośrednictwem


Tworzenie niestandardowego monitorowania potoków dla Delta Live Tables za pomocą haków zdarzeń

Ważne

Obsługa punktów zaczepienia zdarzeń jest dostępna w publicznej wersji zapoznawczej.

Możesz użyć haków zdarzeń, aby dodać niestandardowe funkcje zwrotne w języku Python, które uruchamiane są, gdy zdarzenia są utrwalane w potoku dziennika zdarzeń usługi Delta Live Tables. Za pomocą punktów zaczepienia zdarzeń można zaimplementować niestandardowe rozwiązania do monitorowania i zgłaszania alertów. Można na przykład użyć punktów zaczepienia zdarzeń, aby wysyłać wiadomości e-mail lub zapisywać w dzienniku, gdy wystąpią określone zdarzenia lub zintegrować z rozwiązaniami innych firm w celu monitorowania zdarzeń potoku.

Element zaczepienia zdarzeń definiuje się za pomocą funkcji języka Python, która akceptuje jeden argument, gdzie argument jest słownikiem reprezentującym zdarzenie. Następnie dołączysz punkt zaczepienia zdarzeń jako część kodu źródłowego potoku. Wszelkie haki zdarzeń zdefiniowane w potoku podejmują próbę przetworzenia wszystkich zdarzeń generowanych podczas każdej aktualizacji potoku. Jeśli potok składa się z wielu artefaktów kodu źródłowego, na przykład wielu notesów, wszystkie zdefiniowane haki zdarzeń są stosowane do całego potoku. Mimo że w kodzie źródłowym potoku znajdują się haki zdarzeń, nie są one uwzględnione w grafie potoku.

Można używać haków zdarzeń z potokami publikującymi do metadanych Hive lub Unity Catalog.

Uwaga

  • Język Python jest jedynym językiem obsługiwanym do definiowania punktów zaczepienia zdarzeń. Aby zdefiniować niestandardowe funkcje języka Python, które przetwarzają zdarzenia w potoku zaimplementowanym przy użyciu interfejsu SQL, dodaj funkcje niestandardowe w osobnym notesie języka Python uruchamianym jako część potoku. Funkcje języka Python są stosowane do całego potoku po uruchomieniu potoku.
  • Haczyki zdarzeń są wyzwalane tylko w przypadku zdarzeń, w których poziom_doświadczenia jest STABLE.
  • Zaczepienia zdarzeń są wykonywane asynchronicznie z aktualizacji potoku, ale synchronicznie z innymi punktami zaczepienia zdarzeń. Oznacza to, że tylko jeden punkt zaczepienia zdarzeń jest uruchamiany w danym momencie, a inne haki zdarzeń czekają na uruchomienie do momentu ukończenia aktualnie uruchomionego elementu zaczepienia zdarzeń. Jeśli punkt zaczepienia zdarzeń jest uruchamiany w nieskończoność, blokuje wszystkie inne zaczepienia zdarzeń.
  • Delta Live Tables próbuje uruchomić każdy hak zdarzeń dla każdego zdarzenia emitowanego podczas aktualizacji przepływu danych. Aby zapewnić, że opóźniające się haki zdarzeń mają czas na przetworzenie wszystkich zdarzeń w kolejce, Delta Live Tables oczekuje przez niekonfigurowalny stały okres przed zakończeniem działania obliczeń potoku. Nie ma jednak gwarancji, że wszystkie haki są wyzwalane na wszystkich zdarzeniach przed zakończeniem obliczeń.

Monitorowanie przetwarzania punktów zaczepienia zdarzeń

Użyj typu zdarzenia hook_progress w dzienniku zdarzeń delta Live Tables, aby monitorować stan punktów zaczepienia zdarzeń aktualizacji. Aby zapobiec zależnościom cyklicznym, haki zdarzeń nie są wyzwalane dla hook_progress zdarzeń.

Definiowanie elementu zaczepienia zdarzeń

Aby zdefiniować punkt zaczepienia zdarzeń, użyj dekoratora on_event_hook :

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

Opisuje max_allowable_consecutive_failures maksymalną liczbę kolejnych razy, gdy punkt zaczepienia zdarzeń może zakończyć się niepowodzeniem, zanim zostanie wyłączony. Awaria haka zdarzeń jest definiowana w dowolnym momencie, gdy punkt zaczepienia zdarzeń zgłasza wyjątek. Jeśli punkt zaczepienia zdarzeń jest wyłączony, nie przetwarza nowych zdarzeń do momentu ponownego uruchomienia potoku.

max_allowable_consecutive_failures musi być liczbą całkowitą większą lub równą 0 lub None. Wartość None (ustawiona domyślnie) oznacza, że nie ma limitu na liczbę kolejnych dozwolonych niepowodzeń dla haka zdarzeń, a hak zdarzeń nigdy nie jest wyłączany.

Błędy haka zdarzeń i wyłączanie punktów zaczepienia zdarzeń można monitorować w dzienniku zdarzeń jako hook_progress zdarzenia.

Funkcja haka zdarzeń musi być funkcją języka Python, która akceptuje dokładnie jeden parametr, czyli słownikową reprezentację zdarzenia, które wyzwoliło ten element zaczepienia zdarzeń. Każda wartość zwracana z funkcji punktu zaczepienia zdarzeń jest ignorowana.

Przykład: Wybieranie określonych zdarzeń do przetwarzania

W poniższym przykładzie pokazano punkt zaczepienia zdarzeń, który wybiera określone zdarzenia do przetwarzania. W szczególności ten przykład oczekuje na odebranie zdarzeń potoku STOPPING , a następnie wyprowadza komunikat do dzienników sterowników stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Przykład: wysyłanie wszystkich zdarzeń do kanału usługi Slack

W poniższym przykładzie zaimplementowano punkt zaczepienia zdarzeń, który wysyła wszystkie zdarzenia odebrane do kanału usługi Slack przy użyciu interfejsu API usługi Slack.

W tym przykładzie użyto wpisu tajnego usługi Databricks do bezpiecznego przechowywania tokenu wymaganego do uwierzytelnienia w interfejsie API usługi Slack.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Przykład: Konfigurowanie elementu zaczepienia zdarzeń w celu wyłączenia po czterech kolejnych awariach

W poniższym przykładzie pokazano, jak skonfigurować punkt zaczepienia zdarzeń, który jest wyłączony, jeśli niepowodzenie zakończy się niepowodzeniem cztery razy.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Przykład: przetok Delta Live Tables z hakiem zdarzeń

W poniższym przykładzie pokazano dodanie elementu zaczepienia zdarzeń do kodu źródłowego potoku. Jest to prosty, ale kompletny przykład użycia punktów zaczepienia zdarzeń z potokiem.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })