Sdílet prostřednictvím


Definování vlastního monitorování kanálů DLT pomocí událostních hooků

Důležitý

Podpora pro událostní hooky je ve verzi Public Preview.

Použít můžete událostní háky k přidání vlastních Pythonových funkcí zpětného volání, které se spouštějí, když jsou události uchovávány v datové lince DLT protokolu událostí. Pomocí hooků událostí můžete implementovat vlastní řešení monitorování a upozorňování. Pomocí háků událostí můžete například odesílat e-maily nebo zapisovat do protokolu, když dojde k určitým událostem nebo k integraci s řešeními třetích stran pro monitorování událostí kanálu.

Definujete háček události pomocí funkce Pythonu, která přijímá jeden argument, kde je argument slovníkem představujícím událost. Poté zahrnete event hooky do zdrojového kódu pipeliny. Jakékoliv událostní háky definované v potrubí se pokusí zpracovat všechny události vytvořené během každé aktualizace potrubí. Pokud se kanál skládá z několika artefaktů zdrojového kódu, například z několika poznámkových bloků, použijí se všechny definované háky událostí na celý kanál. Ačkoli jsou událostní háčky zahrnuty do zdrojového kódu pro vaše potrubí, nejsou součástí grafu potrubí.

Můžete použít událostní háčky s datovými kanály, které publikují do metastoru Hive nebo katalogu Unity.

Poznámka

  • Python je jediný jazyk podporovaný pro definování háků událostí. Pokud chcete definovat vlastní funkce Pythonu, které zpracovávají události v kanálu implementovaném pomocí rozhraní SQL, přidejte vlastní funkce do samostatného poznámkového bloku Pythonu, který se spouští jako součást kanálu. Funkce Pythonu se aplikují na celou pipeline při spuštění pipeline.
  • Háky událostí se aktivují pouze pro události, u kterých je maturity_levelSTABLE.
  • Háky událostí se spouští asynchronně z aktualizací kanálu, ale synchronně s jinými háky událostí. To znamená, že v jednom okamžiku běží pouze jeden háček události a ostatní háčky události čekají, než aktuálně běžící háček události skončí. Pokud se háček události spustí na neomezenou dobu, zablokuje všechny ostatní háky událostí.
  • DLT se pokusí spustit každý událostní hák na každé události emitované během aktualizace pipeline. Aby se zajistilo, že opožděné háky událostí mají čas zpracovat všechny události ve frontě, DLT čeká nekonfigurovatelné pevné období před ukončením výpočetního procesu, který spouští kanál. Není však zaručeno, že se všechny hooky aktivují u všech událostí před ukončením výpočtů.

Monitorování zpracování událostního hooku

Pomocí typu události hook_progress v protokolu událostí DLT můžete monitorovat stav záznamů událostí aktualizace. Aby se zabránilo cyklickým závislostem, háky událostí nejsou aktivovány pro události hook_progress.

Definování háku události

K definování event hooku použijte on_event_hook dekorátor:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

max_allowable_consecutive_failures popisuje maximální počet po sobě jdoucích selhání událostního háku, než bude zakázán. Selhání událostního háku je definováno jako každý případ, kdy událostní hák vyvolá výjimku. Pokud je událostní hák zakázán, nezpracovává nové události, dokud se potrubí nenastartuje.

max_allowable_consecutive_failures musí být celé číslo větší nebo rovno 0 nebo None. Hodnota None (přiřazená ve výchozím nastavení) znamená, že pro háček událostí není povolený žádný limit počtu po sobě jdoucích selhání a háček událostí není nikdy zakázán.

Selhání událostních háků a jejich zakázání je možné sledovat v protokolu událostí jako události hook_progress.

Funkce háku události musí být funkce Pythonu, která přijímá přesně jeden parametr, slovníkovou reprezentaci události, která aktivovala tento háček události. Jakákoli návratová hodnota funkce háku události se ignoruje.

Příklad: Výběr konkrétních událostí pro zpracování

Následující příklad ukazuje háku události, která vybere konkrétní události pro zpracování. Konkrétně tento příklad čeká na přijetí událostí potrubí STOPPING a poté zapíše zprávu do protokolů ovladače stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Příklad: Odeslání všech událostí do kanálu Slack

Následující příklad implementuje háček události, který odesílá všechny události přijaté do kanálu Slack pomocí rozhraní Slack API.

V tomto příkladu se k bezpečnému uložení tokenu potřebného k ověření v rozhraní Slack API používá tajný kód Databricks.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Nakonfigurujte hák události, aby se zakázal po čtyřech po sobě jdoucích selháních.

Následující příklad ukazuje, jak nakonfigurovat háček událostí, který je zakázán, pokud selže po sobě po sobě čtyřikrát.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Příklad: Kanál DLT s hákem události

Následující příklad ukazuje, jak přidat událostní hook do zdrojového kódu pro zpracování datové pipeline. Toto je jednoduchý, ale úplný příklad použití hooků událostí s kanálem.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })