Delen via


Aangepaste bewaking van Delta Live Tables-pijplijnen definiëren met gebeurtenishook

Belangrijk

Ondersteuning voor gebeurtenishook is beschikbaar in openbare preview.

U kunt gebeurtenishook gebruiken om aangepaste Python-callbackfuncties toe te voegen die worden uitgevoerd wanneer gebeurtenissen worden bewaard in het gebeurtenislogboek van een Delta Live Tables-pijplijn. U kunt gebeurtenishook gebruiken om aangepaste bewakings- en waarschuwingsoplossingen te implementeren. U kunt bijvoorbeeld gebeurtenishook gebruiken om e-mailberichten te verzenden of naar een logboek te schrijven wanneer specifieke gebeurtenissen optreden of om te integreren met oplossingen van derden om pijplijngebeurtenissen te bewaken.

U definieert een gebeurtenishook met een Python-functie die één argument accepteert, waarbij het argument een woordenlijst is die een gebeurtenis vertegenwoordigt. Vervolgens neemt u de gebeurtenishook op als onderdeel van de broncode voor een pijplijn. Elke gebeurtenishook die in een pijplijn is gedefinieerd, probeert alle gebeurtenissen te verwerken die tijdens elke pijplijnupdate zijn gegenereerd. Als uw pijplijn bestaat uit meerdere broncodeartefacten, bijvoorbeeld meerdere notebooks, worden eventuele gedefinieerde gebeurtenishook toegepast op de hele pijplijn. Hoewel gebeurtenishook wordt opgenomen in de broncode voor uw pijplijn, worden deze niet opgenomen in de pijplijngrafiek.

U kunt gebeurtenishook gebruiken met pijplijnen die publiceren naar de Hive-metastore of Unity Catalog.

Notitie

  • Python is de enige taal die wordt ondersteund voor het definiëren van gebeurtenishook. Als u aangepaste Python-functies wilt definiëren die gebeurtenissen verwerken in een pijplijn die is geïmplementeerd met behulp van de SQL-interface, voegt u de aangepaste functies toe in een afzonderlijk Python-notebook dat wordt uitgevoerd als onderdeel van de pijplijn. De Python-functies worden toegepast op de hele pijplijn wanneer de pijplijn wordt uitgevoerd.
  • Gebeurtenishook wordt alleen geactiveerd voor gebeurtenissen waarbij de maturity_level is STABLE.
  • Gebeurtenishook wordt asynchroon uitgevoerd vanuit pijplijnupdates, maar synchroon met andere gebeurtenishook. Dit betekent dat slechts één gebeurtenishook tegelijk wordt uitgevoerd en dat andere gebeurtenishaken wachten totdat de huidige actieve gebeurtenishook is voltooid. Als een gebeurtenishook voor onbepaalde tijd wordt uitgevoerd, worden alle andere gebeurtenishook geblokkeerd.
  • Delta Live Tables probeert elke gebeurtenishook uit te voeren op elke gebeurtenis die wordt gegenereerd tijdens een pijplijnupdate. Om ervoor te zorgen dat het vastlopen van gebeurtenishook tijd heeft om alle gebeurtenissen in de wachtrij te verwerken, wacht Delta Live Tables een niet-configureerbare vaste periode voordat de berekening wordt beëindigd waarop de pijplijn wordt uitgevoerd. Het is echter niet gegarandeerd dat alle hooks worden geactiveerd op alle gebeurtenissen voordat de berekening wordt beëindigd.

Verwerking van gebeurtenishook bewaken

Gebruik het hook_progress gebeurtenistype in het gebeurtenislogboek van Delta Live Tables om de status van de gebeurtenishook van een update te bewaken. Om circulaire afhankelijkheden te voorkomen, worden gebeurtenishooken niet geactiveerd voor hook_progress gebeurtenissen.

Een gebeurtenishook definiëren

Gebruik de on_event_hook decorator om een gebeurtenishook te definiëren:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

Hierin max_allowable_consecutive_failures wordt het maximum aantal opeenvolgende keren beschreven dat een gebeurtenishook kan mislukken voordat deze wordt uitgeschakeld. Een gebeurtenishookfout wordt gedefinieerd als elke keer dat de gebeurtenishook een uitzondering genereert. Als een gebeurtenishook is uitgeschakeld, worden nieuwe gebeurtenissen pas verwerkt nadat de pijplijn opnieuw is opgestart.

max_allowable_consecutive_failures moet een geheel getal zijn dat groter is dan of gelijk is aan 0 of None. Een waarde van None (standaard toegewezen) betekent dat er geen limiet is voor het aantal opeenvolgende fouten dat is toegestaan voor de gebeurtenishook en dat de gebeurtenishook nooit is uitgeschakeld.

Gebeurtenishookfouten en het uitschakelen van gebeurtenishookhaaks kunnen als gebeurtenissen in het gebeurtenislogboek hook_progress worden bewaakt.

De gebeurtenishookfunctie moet een Python-functie zijn die precies één parameter accepteert, een woordenlijstweergave van de gebeurtenis die deze gebeurtenishook heeft geactiveerd. Elke retourwaarde van de gebeurtenishookfunctie wordt genegeerd.

Voorbeeld: Specifieke gebeurtenissen selecteren voor verwerking

In het volgende voorbeeld ziet u een gebeurtenishook die specifieke gebeurtenissen selecteert voor verwerking. Dit voorbeeld wacht met name totdat pijplijngebeurtenissen STOPPING zijn ontvangen en voert vervolgens een bericht uit naar de stuurprogrammalogboeken stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Voorbeeld: Alle gebeurtenissen verzenden naar een Slack-kanaal

In het volgende voorbeeld wordt een gebeurtenishook geïmplementeerd waarmee alle gebeurtenissen die zijn ontvangen naar een Slack-kanaal worden verzonden met behulp van de Slack-API.

In dit voorbeeld wordt een Databricks-geheim gebruikt om veilig een token op te slaan dat is vereist voor verificatie bij de Slack-API.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Voorbeeld: Een gebeurtenishook configureren om uit te schakelen na vier opeenvolgende fouten

In het volgende voorbeeld ziet u hoe u een gebeurtenishook configureert die is uitgeschakeld als deze opeenvolgend vier keer mislukt.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Voorbeeld: Een Delta Live Tables-pijplijn met een gebeurtenishook

In het volgende voorbeeld ziet u hoe u een gebeurtenishook toevoegt aan de broncode voor een pijplijn. Dit is een eenvoudig maar volledig voorbeeld van het gebruik van gebeurtenishook met een pijplijn.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })