Aangepaste bewaking van DLT-pijplijnen definiëren met gebeurtenishooks
Belangrijk
Ondersteuning voor event hooks bevindt zich in openbare versie.
U kunt gebeurtenishook gebruiken om aangepaste Python-callbackfuncties toe te voegen die worden uitgevoerd wanneer gebeurtenissen worden bewaard in het gebeurtenislogboek van een DLT-pijplijn. U kunt gebeurtenishook gebruiken om aangepaste bewakings- en waarschuwingsoplossingen te implementeren. U kunt bijvoorbeeld gebeurtenishook gebruiken om e-mailberichten te verzenden of naar een logboek te schrijven wanneer specifieke gebeurtenissen optreden of om te integreren met oplossingen van derden om pijplijngebeurtenissen te bewaken.
U definieert een gebeurtenishook met een Python-functie die één argument accepteert, waarbij het argument een woordenlijst is die een gebeurtenis vertegenwoordigt. Vervolgens neemt u de gebeurtenishook op als onderdeel van de broncode voor een pijplijn. Elke gebeurtenishook die in een pijplijn is gedefinieerd, probeert alle gebeurtenissen te verwerken die tijdens elke pijplijnupdate zijn gegenereerd. Als uw pijplijn bestaat uit meerdere broncodeartefacten, bijvoorbeeld meerdere notebooks, worden eventuele gedefinieerde gebeurtenishook toegepast op de hele pijplijn. Hoewel gebeurtenishaken in de broncode voor uw pijplijn worden opgenomen, worden ze niet opgenomen in de pijplijngrafiek.
U kunt gebeurtenishook gebruiken met pijplijnen die publiceren naar de Hive-metastore of Unity Catalog.
Notitie
- Python is de enige taal die wordt ondersteund voor het definiëren van gebeurtenishook. Als u aangepaste Python-functies wilt definiëren die gebeurtenissen verwerken in een pijplijn die is geïmplementeerd met behulp van de SQL-interface, voegt u de aangepaste functies toe in een afzonderlijk Python-notebook dat wordt uitgevoerd als onderdeel van de pijplijn. De Python-functies worden op de gehele pijplijn toegepast wanneer deze draait.
- Gebeurtenishaken worden alleen geactiveerd voor gebeurtenissen waarbij het maturity_level is
STABLE
. - Gebeurtenishooks worden asynchroon uitgevoerd met pijplijnupdates, maar synchroon met andere gebeurtenishooks. Dit betekent dat slechts één gebeurtenishook tegelijk wordt uitgevoerd en dat andere gebeurtenishaken wachten totdat de huidige actieve gebeurtenishook is voltooid. Als een gebeurtenishook voor onbepaalde tijd wordt uitgevoerd, worden alle andere gebeurtenishook geblokkeerd.
- DLT probeert elke gebeurtenishook uit te voeren op elke gebeurtenis die wordt verzonden tijdens een pijplijnupdate. Om ervoor te zorgen dat de gebeurtenishooks die achterblijven tijd hebben om alle gebeurtenissen in de wachtrij te verwerken, wacht DLT een niet-aanpasbare vaste periode voordat de rekeninstantie waarop de pijplijn draait, wordt beëindigd. Het is echter niet gegarandeerd dat alle hooks worden geactiveerd op alle gebeurtenissen voordat de berekening wordt beëindigd.
Het monitoren van de verwerking van gebeurtenishooks
Gebruik het hook_progress
gebeurtenistype in het DLT-gebeurtenislogboek om de status van de gebeurtenishook van een update te controleren. Om kringafhankelijkheden te voorkomen, worden gebeurtenishooken niet geactiveerd voor hook_progress
gebeurtenissen.
Een gebeurtenishook definiëren
Als u een gebeurtenishook wilt definiëren, gebruikt u de on_event_hook
decorator:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
De max_allowable_consecutive_failures
beschrijft het maximum aantal opeenvolgende keren dat een gebeurtenishook kan mislukken voordat deze wordt uitgeschakeld. Een gebeurtenishookfout wordt gedefinieerd als elke keer dat de gebeurtenishook een uitzondering genereert. Als een gebeurtenishook is uitgeschakeld, worden nieuwe gebeurtenissen pas verwerkt nadat de pijplijn opnieuw is opgestart.
max_allowable_consecutive_failures
moet een geheel getal groter dan of gelijk aan 0
of None
zijn. Een waarde van None
(standaard toegewezen) betekent dat er geen limiet is voor het aantal opeenvolgende fouten dat is toegestaan voor de gebeurtenishook en dat de gebeurtenishook nooit is uitgeschakeld.
Gebeurtenishookfouten, en het uitschakelen van gebeurtenishooks, kunnen als hook_progress
-gebeurtenissen worden bewaakt in het gebeurtenislogboek.
De gebeurtenishookfunctie moet een Python-functie zijn die precies één parameter accepteert, een woordenlijstweergave van de gebeurtenis die deze gebeurtenishook heeft geactiveerd. Elke retourwaarde van de gebeurtenishookfunctie wordt genegeerd.
Voorbeeld: Specifieke gebeurtenissen selecteren voor verwerking
In het volgende voorbeeld ziet u een gebeurtenishook die specifieke gebeurtenissen selecteert voor verwerking. Dit voorbeeld wacht met name totdat pijplijn-STOPPING
gebeurtenissen worden ontvangen en voert vervolgens een bericht uit naar de stuurprogrammalogboeken stdout
.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Voorbeeld: Alle gebeurtenissen verzenden naar een Slack-kanaal
In het volgende voorbeeld wordt een gebeurtenishook geïmplementeerd waarmee alle gebeurtenissen die zijn ontvangen naar een Slack-kanaal worden verzonden met behulp van de Slack-API.
In dit voorbeeld wordt een Databricks-geheim gebruikt om een token veilig op te slaan dat is vereist voor verificatie bij de Slack-API.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Voorbeeld: Een gebeurtenishook configureren om uit te schakelen na vier opeenvolgende fouten
In het volgende voorbeeld ziet u hoe u een gebeurtenishook configureert die is uitgeschakeld als deze opeenvolgend vier keer mislukt.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Voorbeeld: Een DLT-pijplijn met een event hook
In het volgende voorbeeld ziet u hoe u een gebeurtenishook toevoegt aan de broncode voor een pijplijn. Dit is een eenvoudig maar volledig voorbeeld van het gebruik van gebeurtenishook met een pijplijn.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})