แชร์ผ่าน


Define custom monitoring of Delta Live Tables pipelines with event hooks

Important

Support for event hooks is in Public Preview.

You can use event hooks to add custom Python callback functions that run when events are persisted to a Delta Live Tables pipeline’s event log. You can use event hooks to implement custom monitoring and alerting solutions. For example, you can use event hooks to send emails or write to a log when specific events occur or to integrate with third-party solutions to monitor pipeline events.

You define an event hook with a Python function that accepts a single argument, where the argument is a dictionary representing an event. You then include the event hooks as part of the source code for a pipeline. Any event hooks defined in a pipeline will attempt to process all events generated during each pipeline update. If your pipeline is composed of multiple source code artifacts, for example, multiple notebooks, any defined event hooks are applied to the entire pipeline. Although event hooks are included in the source code for your pipeline, they are not included in the pipeline graph.

You can use event hooks with pipelines that publish to the Hive metastore or Unity Catalog.

Note

  • Python is the only language supported for defining event hooks. To define custom Python functions that process events in a pipeline implemented using the SQL interface, add the custom functions in a separate Python notebook that runs as part of the pipeline. The Python functions are applied to the entire pipeline when the pipeline runs.
  • Event hooks are triggered only for events where the maturity_level is STABLE.
  • Event hooks are executed asynchronously from pipeline updates but synchronously with other event hooks. This means that only a single event hook runs at a time, and other event hooks wait to run until the currently running event hook completes. If an event hook runs indefinitely, it blocks all other event hooks.
  • Delta Live Tables attempts to run each event hook on every event emitted during a pipeline update. To help ensure that lagging event hooks have time to process all queued events, Delta Live Tables waits a non-configurable fixed period before terminating the compute running the pipeline. However, it is not guaranteed that all hooks are triggered on all events before the compute is terminated.

Monitor event hook processing

Use the hook_progress event type in the Delta Live Tables event log to monitor the state of an update’s event hooks. To prevent circular dependencies, event hooks are not triggered for hook_progress events.

Define an event hook

To define an event hook, use the on_event_hook decorator:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

The max_allowable_consecutive_failures describes the maximum number of consecutive times an event hook can fail before it is disabled. An event hook failure is defined as any time the event hook throws an exception. If an event hook is disabled, it does not process new events until the pipeline is restarted.

max_allowable_consecutive_failures must be an integer greater than or equal to 0 or None. A value of None (assigned by default) means there is no limit to the number of consecutive failures allowed for the event hook, and the event hook is never disabled.

Event hook failures and disabling of event hooks can be monitored in the event log as hook_progress events.

The event hook function must be a Python function that accepts exactly one parameter, a dictionary representation of the event that triggered this event hook. Any return value from the event hook function is ignored.

Example: Select specific events for processing

The following example demonstrates an event hook that selects specific events for processing. Specifically, this example waits until pipeline STOPPING events are received and then outputs a message to the driver logs stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Example: Send all events to a Slack channel

The following example implements an event hook that sends all events received to a Slack channel using the Slack API.

This example uses a Databricks secret to securely store a token required to authenticate to the Slack API.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Example: Configure an event hook to disable after four consecutive failures

The following example demonstrates how to configure an event hook that is disabled if it fails consecutively four times.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Example: A Delta Live Tables pipeline with an event hook

The following example demonstrates adding an event hook to the source code for a pipeline. This is a simple but complete example of using event hooks with a pipeline.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })