Определение пользовательского мониторинга конвейеров Delta Live Tables с помощью перехватчиков событий
Внимание
Поддержка перехватчиков событий доступна в общедоступной предварительной версии.
Можно использовать перехватчики событий для добавления пользовательских функций обратного вызова на Python, которые выполняются при сохранении событий в журнале событий Delta Live Tables . Вы можете использовать перехватчики событий для реализации пользовательских решений мониторинга и оповещений. Например, можно использовать перехватчики событий для отправки сообщений электронной почты или записи в журнал при возникновении определенных событий или интеграции с сторонними решениями для мониторинга событий конвейера.
Вы определяете перехватчик событий с функцией Python, которая принимает один аргумент, где аргумент является словарем, представляющим событие. Затем вы включаете перехватчики событий в состав исходного кода для конвейера. Все перехватчики событий, определенные в конвейере, попытаются обработать все события, созданные во время каждого обновления конвейера. Если конвейер состоит из нескольких артефактов исходного кода, например нескольких записных книжек, к всему конвейеру применяются все определенные перехватчики событий. Хотя перехватчики событий включены в исходный код для конвейера, они не включены в граф конвейера.
Вы можете использовать перехватчики событий совместно с конвейерами, которые публикуют данные в хранилища метаданных Hive или Unity Catalog.
Примечание.
- Python — единственный язык, поддерживаемый для определения перехватчиков событий. Чтобы определить пользовательские функции Python, обрабатывающие события в конвейере, реализованные с помощью интерфейса SQL, добавьте пользовательские функции в отдельную записную книжку Python, которая выполняется в рамках конвейера. Функции Python применяются ко всему конвейеру при запуске конвейера.
- Перехватчики событий активируются только для тех событий, где уровень зрелости — это
STABLE
. - Перехватчики событий выполняются асинхронно из обновлений конвейера, но синхронно с другими перехватчиками событий. Это означает, что одновременно выполняется только один перехватчик событий, а другие перехватчики событий ожидают выполнения до завершения перехватчика событий в данный момент. Если перехватчик событий выполняется неограниченное время, он блокирует все остальные перехватчики событий.
- Delta Live Tables пытаются запустить каждый хук события при каждом событии, генерируемом во время обновления конвейера. Чтобы убедиться, что перехватчики событий имеют время обрабатывать все поставленные в очередь события, Delta Live Tables ждет не настраиваемое фиксированное время перед завершением процесса, запускающего конвейер. Однако не гарантируется, что все перехватчики активируются во всех событиях до завершения вычисления.
Мониторинг обработки перехватчика событий
Используйте тип события hook_progress
в журнале событий Delta Live Tables для мониторинга состояния хуков событий обновления. Чтобы предотвратить циклические зависимости, перехватчики событий не активируются для hook_progress
событий.
Определение перехватчика событий
Чтобы определить перехватчик событий, используйте on_event_hook
декоратор:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
В этом max_allowable_consecutive_failures
разделе описывается максимальное число последовательных попыток перехватчика событий, прежде чем оно отключено. Сбой перехватчика событий определяется в любой момент, когда перехватчик событий создает исключение. Если перехватчик событий отключен, он не обрабатывает новые события до перезапуска конвейера.
max_allowable_consecutive_failures
должно быть целым числом, превышающим или равным 0
или None
равным. Значение None
(назначенное по умолчанию) означает, что количество последовательных сбоев, разрешенных для перехватчика событий, не ограничено, а перехватчик событий никогда не отключается.
Сбои перехвата событий и отключение перехватчиков событий можно отслеживать в журнале событий как hook_progress
события.
Функция перехватчика событий должна быть функцией Python, которая принимает ровно один параметр, словарное представление события, которое активировало этот перехватчик событий. Любое возвращаемое значение функции перехватчика событий игнорируется.
Пример. Выбор определенных событий для обработки
В следующем примере показан перехватчик событий, который выбирает определенные события для обработки. В частности, в этом примере ожидается получение событий конвейера STOPPING
, а затем выводится сообщение в журналы stdout
драйверов.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Пример. Отправка всех событий в канал Slack
В следующем примере реализован перехватчик событий, который отправляет все события, полученные в канал Slack с помощью API Slack.
В этом примере используется секрет Databricks для безопасного хранения маркера, необходимого для проверки подлинности в API Slack.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Пример. Настройка перехватчика событий для отключения после четырех последовательных сбоев
В следующем примере показано, как настроить перехватчик событий, который отключен, если он завершается последовательно четыре раза.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Пример: конвейер Delta Live Tables с обработчиком событий
В следующем примере показано добавление перехватчика событий в исходный код конвейера. Это простой, но полный пример использования перехватчиков событий с конвейером.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})