使用事件挂钩定义增量实时表管道的自定义监视

重要

事件挂钩支持为公共预览版

可以使用事件挂钩添加自定义 Python 回调函数,这些函数在事件持久保存到增量实时表管道的事件日志时运行。 可以使用事件挂钩来实现自定义监视和警报解决方案。 例如,可以使用事件挂钩在发生特定事件时发送电子邮件或写入日志,或者与第三方解决方案集成以监视管道事件。

使用接受单个参数的 Python 函数定义事件挂钩,其中该参数是表示事件的字典。 然后,在管道的源代码中加入事件挂钩。 管道中定义的任何事件挂钩都将尝试处理每个管道更新期间生成的所有事件。 如果管道由多个源代码项目组成,例如多个笔记本,则所有定义的事件挂钩都应用于整个管道。 尽管事件挂钩包含在管道的源代码中,但它们不包括在管道图中。

可以将事件挂钩与发布到 Hive 元存储或 Unity Catalog 的管道配合使用。

注意

  • Python 是唯一支持定义事件挂钩的语言。 若要定义在使用 SQL 接口实现的管道中处理事件的自定义 Python 函数,请在作为管道一部分运行的单独 Python 笔记本中添加自定义函数。 Python 函数在管道运行时应用于整个管道。
  • 只有 maturity_levelSTABLE 的事件才会触发事件挂钩。
  • 事件挂钩虽然与管道更新异步执行,但与其他事件挂钩同步执行。 这意味着一次只运行一个事件挂钩,其他事件挂钩等到当前正在运行的事件挂钩完成后才运行。 如果某个事件挂钩无限期运行,它将阻止所有其他事件挂钩。
  • 增量实时表尝试对管道更新期间发出的每个事件运行每个事件挂钩。 为了帮助确保滞后事件挂钩有时间处理所有排队事件,增量实时表会在终止运行管道的计算之前等待不可配置的固定期限。 但是,不能保证在终止计算之前在所有事件上触发所有挂钩。

监视事件挂钩处理

使用增量实时表事件日志中的 hook_progress 事件类型监视更新的事件挂钩的状态。 为了防止循环依赖,不会为 hook_progress 事件触发事件挂钩。

定义事件挂钩

若要定义事件挂钩,请使用 on_event_hook 修饰器:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

max_allowable_consecutive_failures 描述在禁用事件挂钩之前事件挂钩可以失败的最大连续次数。 只要事件挂钩引发异常即可定义为事件挂钩失败。 如果禁用事件挂钩,则在重启管道之前,它不会处理新事件。

max_allowable_consecutive_failures 必须为大于或等于 0None 的整数。 None 的值(默认分配)表示事件挂钩允许的连续失败次数没有限制,并且永远不会禁用事件挂钩。

事件挂钩失败和禁用事件挂钩可以在事件日志中作为 hook_progress 事件进行监视。

事件挂钩函数必须是一个 Python 函数,它只接受一个参数,即触发此事件挂钩的事件的字典表示形式。 将忽略事件挂钩函数中的任何返回值。

示例:选择要处理的特定事件

以下示例演示了一个事件挂钩,该挂钩选择要处理的特定事件。 具体而言,此示例会等到收到管道 STOPPING 事件,然后将消息输出到驱动程序日志 stdout

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

示例:将所有事件发送到 Slack 通道

以下示例实现一个事件挂钩,该挂钩使用 Slack API 将接收的所有事件发送到 Slack 通道。

此示例使用 Databricks 机密安全地存储对 Slack API 进行身份验证所需的令牌。

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

示例:将事件挂钩配置为在连续四次失败后禁用

以下示例演示如何配置在连续四次失败后禁用的事件挂钩。

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

示例:具有事件挂钩的增量实时表管道

以下示例演示如何向管道的源代码添加事件挂钩。 这是在管道中使用事件挂钩的简单但完整的示例。

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })