Dela via


Definiera anpassad övervakning av DLT-flöden med händelseutlösare

Viktig

Stöd för händelsekrokar finns i offentlig förhandsversion.

Du kan använda händelsekrokar för att lägga till anpassade Python-återanropsfunktioner som körs när händelser sparas i en DLT-pipelines händelselogg. Du kan använda händelsekrokar för att implementera anpassade lösningar för övervakning och aviseringar. Du kan till exempel använda händelsekrokar för att skicka e-postmeddelanden eller skriva till en logg när specifika händelser inträffar eller integrera med lösningar från tredje part för att övervaka pipelinehändelser.

Du definierar en händelsekrok med en Python-funktion som accepterar ett enda argument, där argumentet är en ordlista som representerar en händelse. Sedan inkluderar du händelsekrokerna som en del av källkoden för en pipeline. Alla händelsekrokar som definierats i en pipeline försöker bearbeta alla händelser som genereras under varje pipelineuppdatering. Om din pipeline består av flera källkodsartefakter, till exempel flera notebook-filer, tillämpas alla definierade händelsekrokar på hela pipelinen. Även om händelsekrokar ingår i källkoden för pipelinen ingår de inte i pipelinediagrammet.

Du kan använda händelsekrokar med pipelines som publicerar till Hive-metaarkivet eller Unity Catalog.

Notera

  • Python är det enda språk som stöds för att definiera händelsekrokar. Om du vill definiera anpassade Python-funktioner som bearbetar händelser i en pipeline som implementeras med hjälp av SQL-gränssnittet lägger du till de anpassade funktionerna i en separat Python-notebook-fil som körs som en del av pipelinen. Python-funktionerna tillämpas på hela pipelinen när pipelinen körs.
  • Event hooks aktiveras endast för händelser där maturity_level är STABLE.
  • Händelsekrokar körs asynkront från pipelineuppdateringar men synkront med andra händelsekrokar. Det innebär att endast en händelsekrok körs åt gången, och att andra händelsekrokar väntar med att köras tills den nuvarande händelsekroken är slutförd. Om en händelsekrok körs på obestämd tid blockerar den alla andra händelsekrokar.
  • DLT försöker köra varje händelsekrok på varje händelse som genereras under en pipelineuppdatering. För att säkerställa att fördröjda eventkrokar har tid att bearbeta alla köade händelser, väntar DLT en icke-konfigurerbar fast period innan beräkningsprocessen som kör pipelinen avslutas. Det är dock inte garanterat att alla hooks utlöses vid alla händelser innan beräkningsprocessen avslutas.

Övervaka händelsekrokbearbetning

Använd hook_progress händelsetypen i DLT-händelseloggen för att övervaka tillståndet för en uppdaterings händelsekrokar. För att förhindra cirkulära beroenden utlöses inte händelsekrokar för hook_progress händelser.

Definiera en händelsekrok

Om du vill definiera en händelsekrok använder du on_event_hook dekoratör:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

I max_allowable_consecutive_failures beskrivs det maximala antalet på varandra följande gånger som en händelsekrok kan misslyckas innan den inaktiveras. Ett fel i en händelsekrok definieras som närhelst händelsekroken utlöser ett undantag. Om en händelsekrok är inaktiverad bearbetas inte nya händelser förrän pipelinen startas om.

max_allowable_consecutive_failures måste vara ett heltal som är större än eller lika med 0 eller None. Värdet None (tilldelas som standard) innebär att det inte finns någon gräns för antalet efterföljande fel som tillåts för händelsekroken, och händelsekroken inaktiveras aldrig.

Misslyckanden vid händelsekrokar och inaktivering av händelsekrokar kan övervakas i händelseloggen som hook_progress-händelser.

Funktionen event hook måste vara en Python-funktion som accepterar exakt en parameter, en ordlisterepresentation av händelsen som utlöste den här händelsekroken. Eventuella returvärden från händelsekroksfunktionen ignoreras.

Exempel: Välj specifika händelser för bearbetning

I följande exempel visas en händelsekrok som väljer specifika händelser för bearbetning. Mer specifikt väntar detta exempel tills händelser från pipeline STOPPING tas emot och sedan matar ut ett meddelande till förarens loggar stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Exempel: Skicka alla händelser till en Slack-kanal

I följande exempel implementeras en händelsekrok som skickar alla händelser som tagits emot till en Slack-kanal med hjälp av Slack-API:et.

I det här exemplet används en Databricks-hemlighet för att lagra en token som krävs för att autentisera till Slack-API:et på ett säkert sätt.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Exempel: Konfigurera en händelsekrok för att inaktivera efter fyra på varandra följande fel

I följande exempel visas hur du konfigurerar en händelsekrok som är inaktiverad om den misslyckas fyra gånger i följd.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Exempel: En DLT-pipeline med en händelsekrok

I följande exempel visas hur du lägger till en händelsekrok i källkoden för en pipeline. Det här är ett enkelt men komplett exempel på hur du använder händelsekrokar med en pipeline.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })