다음을 통해 공유


이벤트 후크를 사용하여 Delta Live Tables 파이프라인의 사용자 지정 모니터링 정의

Important

이벤트 후크에 대한 지원은 공개 미리 보기제공됩니다.

이벤트 후크 사용하여 이벤트가 Delta Live Tables 파이프라인의 이벤트 로그지속될 때 실행되는 사용자 지정 Python 콜백 함수를 추가할 수 있습니다. 이벤트 후크를 사용하여 사용자 지정 모니터링 및 경고 솔루션을 구현할 수 있습니다. 예를 들어 이벤트 후크를 사용하여 특정 이벤트가 발생할 때 전자 메일을 보내거나 로그에 쓰거나 타사 솔루션과 통합하여 파이프라인 이벤트를 모니터링할 수 있습니다.

단일 인수를 허용하는 Python 함수를 사용하여 이벤트 후크를 정의합니다. 여기서 인수는 이벤트를 나타내는 사전입니다. 그런 다음 파이프라인에 대한 소스 코드의 일부로 이벤트 후크를 포함합니다. 파이프라인에 정의된 모든 이벤트 후크는 각 파이프라인 업데이트 중에 생성된 모든 이벤트를 처리하려고 시도합니다. 파이프라인이 여러 소스 코드 아티팩트(예: 여러 Notebook)로 구성된 경우 정의된 이벤트 후크가 전체 파이프라인에 적용됩니다. 이벤트 후크는 파이프라인의 소스 코드에 포함되지만 파이프라인 그래프에는 포함되지 않습니다.

Hive 메타스토어 또는 Unity 카탈로그에 게시하는 파이프라인과 함께 이벤트 후크를 사용할 수 있습니다.

참고 항목

  • Python은 이벤트 후크를 정의하는 데 지원되는 유일한 언어입니다. SQL 인터페이스를 사용하여 구현된 파이프라인에서 이벤트를 처리하는 사용자 지정 Python 함수를 정의하려면 파이프라인의 일부로 실행되는 별도의 Python Notebook에 사용자 지정 함수를 추가합니다. Python 함수는 파이프라인이 실행되면 전체 파이프라인에 적용됩니다.
  • 이벤트 후크는 maturity_levelSTABLE일 때만 트리거됩니다.
  • 이벤트 후크는 파이프라인 업데이트에서 비동기적으로 실행되지만 다른 이벤트 후크와 동기적으로 실행됩니다. 즉, 한 번에 하나의 이벤트 후크만 실행되고 다른 이벤트 후크는 현재 실행 중인 이벤트 후크가 완료될 때까지 실행되기를 기다립니다. 이벤트 후크가 무기한 실행되면 다른 모든 이벤트 후크가 차단됩니다.
  • Delta Live Tables는 파이프라인 업데이트 중에 내보내는 모든 이벤트에서 각 이벤트 후크를 실행하려고 시도합니다. 지연 이벤트 후크가 대기 중인 모든 이벤트를 처리할 시간을 갖도록 하기 위해 Delta Live Tables는 파이프라인을 실행하는 컴퓨팅을 종료하기 전에 구성할 수 없는 고정 기간을 기다립니다. 그러나 컴퓨팅이 종료되기 전에 모든 이벤트에서 모든 후크가 트리거되는 것은 아닙니다.

이벤트 후크 처리 모니터링

Delta Live Tables 이벤트 로그의 hook_progress 이벤트 유형을 사용하여 업데이트의 이벤트 후크 상태를 모니터링합니다. 순환 종속성을 방지하기 위해 이벤트에 대해 hook_progress 이벤트 후크가 트리거되지 않습니다.

이벤트 후크 정의

이벤트 후크를 정의하려면 데코레이터를 on_event_hook 사용합니다.

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

이벤트 max_allowable_consecutive_failures 후크가 비활성화되기 전에 실패할 수 있는 최대 연속 횟수를 설명합니다. 이벤트 후크 오류는 이벤트 후크가 예외를 throw할 때마다 정의됩니다. 이벤트 후크를 사용하지 않도록 설정하면 파이프라인이 다시 시작될 때까지 새 이벤트를 처리하지 않습니다.

max_allowable_consecutive_failures는 같거나 0None 거나 같은 정수여야 합니다. None 값(기본적으로 할당됨)은 이벤트 후크에 허용되는 연속 실패 수에 제한이 없으며 이벤트 후크를 사용하지 않도록 설정하지 않음을 의미합니다.

이벤트 후크 오류 및 이벤트 후크 비활성화는 이벤트 로그에서 이벤트로 hook_progress 모니터링할 수 있습니다.

이벤트 후크 함수는 정확히 하나의 매개 변수를 허용하는 Python 함수여야 하며, 이 이벤트 후크를 트리거한 이벤트의 사전 표현이어야 합니다. 이벤트 후크 함수의 반환 값은 무시됩니다.

예: 처리를 위한 특정 이벤트 선택

다음 예제에서는 처리를 위해 특정 이벤트를 선택하는 이벤트 후크를 보여 줍니다. 특히 이 예제에서는 파이프라인 STOPPING 이벤트가 수신될 때까지 기다린 다음 드라이버 로그에 메시지를 출력합니다 stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

예: Slack 채널에 모든 이벤트 보내기

다음 예제에서는 Slack API를 사용하여 Slack 채널에 수신된 모든 이벤트를 보내는 이벤트 후크를 구현합니다.

이 예제에서는 Databricks 비밀을 사용하여 Slack API에 인증하는 데 필요한 토큰을 안전하게 저장합니다.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

예: 4회 연속 실패 후 사용하지 않도록 이벤트 후크 구성

다음 예제에서는 연속적으로 4번 실패할 경우 비활성화된 이벤트 후크를 구성하는 방법을 보여 줍니다.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

예: 이벤트 후크가 있는 Delta Live Tables 파이프라인

다음 예제에서는 파이프라인의 소스 코드에 이벤트 후크를 추가하는 방법을 보여 줍니다. 파이프라인과 함께 이벤트 후크를 사용하는 간단한 예제입니다.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })