Поделиться через


Определение пользовательского мониторинга конвейеров DLT с помощью перехватчиков событий

Важный

Поддержка хуков событий представлена в общедоступной предварительной версии.

Можно использовать перехватчики событий для добавления пользовательских функций обратного вызова Python, которые запускаются при сохранении событий в журнале событий конвейера DLT. Вы можете использовать хуки событий для реализации пользовательских решений мониторинга и оповещений. Например, можно использовать перехватчики событий для отправки сообщений электронной почты или записи в журнал при возникновении определенных событий или интеграции с сторонними решениями для мониторинга событий конвейера.

Вы определяете перехватчик событий с функцией Python, которая принимает один аргумент, где аргумент является словарем, представляющим событие. Затем вы включаете перехватчики событий в состав исходного кода для конвейера. Все обработчики событий, определенные в конвейере, попытаются обработать все события, созданные во время каждого обновления конвейера. Если конвейер состоит из нескольких артефактов исходного кода, например нескольких записных книжек, ко всему конвейеру применяются определенные перехватчики событий. Хотя перехватчики событий включены в исходный код вашего конвейера, они не включены в граф конвейера.

Вы можете использовать хуки событий с конвейерами, которые публикуются в метахранилище Hive или каталоге Unity.

Заметка

  • Python — единственный поддерживаемый язык для определения перехватчиков событий. Чтобы определить пользовательские функции Python, обрабатывающие события в конвейере, реализованные с помощью интерфейса SQL, добавьте пользовательские функции в отдельную записную книжку Python, которая выполняется в рамках конвейера. Функции Python применяются ко всему конвейеру при запуске конвейера.
  • Перехватчики событий активируются только для событий, в которых maturity_levelSTABLE.
  • Перехватчики событий выполняются асинхронно относительно обновлений конвейера, но синхронно с другими перехватчиками событий. Это означает, что одновременно выполняется только один перехватчик событий, а другие перехватчики событий ожидают завершения выполнения текущего перехватчика событий. Если перехватчик событий выполняется неограниченное время, он блокирует все остальные перехватчики событий.
  • DLT пытается запустить каждый обработчик событий для каждого события, генерируемого во время обновления конвейера. Чтобы гарантировать, что отстающие крючки событий имеют время для обработки всех событий в очереди, система DLT ждет заданное фиксированное время перед завершением вычислений, выполняющих конвейер. Однако не гарантируется, что все хуки активируются для всех событий, прежде чем вычисление завершится.

Мониторинг обработки перехваченных событий

Используйте тип события hook_progress в журнале событий DLT для мониторинга состояния хуков событий обновления. Чтобы предотвратить циклические зависимости, перехватчики событий не активируются для событий hook_progress.

Определите перехватчик событий

Чтобы определить хуку события, используйте декоратор on_event_hook.

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

В max_allowable_consecutive_failures описывается максимальное число последовательных попыток перехватчика событий до отключения. Сбой перехватчика событий определяется в любой момент, когда перехватчик событий создает исключение. Если перехватчик событий отключен, он не обрабатывает новые события до перезапуска конвейера.

max_allowable_consecutive_failures должно быть целым числом, превышающим или равным 0 или None. Значение None (назначенное по умолчанию) означает, что количество последовательных сбоев, разрешенных для перехватчика событий, не ограничено, а перехватчик событий никогда не отключается.

Сбои перехватов событий и отключение перехватов событий можно отслеживать в журнале событий как события hook_progress.

Функция перехватчика событий должна быть функцией Python, которая принимает ровно один параметр, словарное представление события, которое активировало этот перехватчик событий. Любое возвращаемое значение функции перехватчика событий игнорируется.

Пример. Выбор определенных событий для обработки

В следующем примере демонстрируется обработчик событий, который выбирает определённые события для обработки. В частности, этот пример ожидает получения событий STOPPING конвейера, а затем выводит сообщение в журналы драйвера stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Пример. Отправка всех событий в канал Slack

В следующем примере реализован перехватчик событий, который отправляет все события, полученные в канал Slack с помощью API Slack.

В этом примере используется секрет Databricks для безопасного хранения маркера, необходимого для аутентификации в API Slack.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Пример. Настройка перехватчика событий для отключения после четырех последовательных сбоев

В следующем примере показано, как настроить перехватчик событий, который отключается, если он терпит неудачу подряд четыре раза.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Пример: Конвейер DLT с хуком событий

В следующем примере показано добавление перехватчика событий в исходный код конвейера. Это простой, но полный пример использования перехватчиков событий с конвейером.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })