Share via


Definiera anpassad övervakning av Delta Live Tables-pipelines med händelsekrokar

Viktigt!

Stöd för händelsekrokar finns i offentlig förhandsversion.

Du kan använda händelsekrokar för att lägga till anpassade Python-återanropsfunktioner som körs när händelser sparas i en Delta Live Tables-pipelines händelselogg. Du kan använda händelsekrokar för att implementera anpassade lösningar för övervakning och aviseringar. Du kan till exempel använda händelsekrokar för att skicka e-postmeddelanden eller skriva till en logg när specifika händelser inträffar eller integrera med lösningar från tredje part för att övervaka pipelinehändelser.

Du definierar en händelsekrok med en Python-funktion som accepterar ett enda argument, där argumentet är en ordlista som representerar en händelse. Sedan inkluderar du händelsekrokerna som en del av källkoden för en pipeline. Alla händelsekrokar som definierats i en pipeline försöker bearbeta alla händelser som genereras under varje pipelineuppdatering. Om din pipeline består av flera källkodsartefakter, till exempel flera notebook-filer, tillämpas alla definierade händelsekrokar på hela pipelinen. Även om händelsekrokar ingår i källkoden för pipelinen ingår de inte i pipelinediagrammet.

Du kan använda händelsekrokar med pipelines som publicerar till Hive-metaarkivet eller Unity Catalog.

Kommentar

  • Python är det enda språk som stöds för att definiera händelsekrokar.
  • Händelsekrokar utlöses endast för händelser där maturity_level är STABLE.
  • Händelsekrokar körs asynkront från pipelineuppdateringar men synkront med andra händelsekrokar. Det innebär att endast en enda händelsekrok körs i taget och att andra händelsekrokar väntar på att köras tills den händelsekrok som körs har slutförts. Om en händelsekrok körs på obestämd tid blockerar den alla andra händelsekrokar.
  • Delta Live Tables försöker köra varje händelsekrok på varje händelse som genereras under en pipelineuppdatering. För att säkerställa att eftersläpande händelsekrokar har tid att bearbeta alla köade händelser väntar Delta Live Tables en icke-konfigurerbar fast period innan beräkningen som kör pipelinen avslutas. Det är dock inte garanterat att alla krokar utlöses på alla händelser innan beräkningen avslutas.

Övervaka händelsekrokbearbetning

hook_progress Använd händelsetypen i Delta Live Tables-händelseloggen för att övervaka tillståndet för en uppdaterings händelsekrokar. För att förhindra cirkulära beroenden utlöses inte händelsekrokar för hook_progress händelser.

Definiera en händelsekrok

Om du vill definiera en händelsekrok använder du dekoratören on_event_hook :

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

Beskriver max_allowable_consecutive_failures det maximala antalet på varandra följande gånger som en händelsekrok kan misslyckas innan den inaktiveras. Ett händelsekrokfel definieras som varje gång händelsekroken utlöser ett undantag. Om en händelsekrok är inaktiverad bearbetas inte nya händelser förrän pipelinen startas om.

max_allowable_consecutive_failures måste vara ett heltal större än eller lika med 0 eller None. Värdet None (tilldelad som standard) innebär att det inte finns någon gräns för antalet efterföljande fel som tillåts för händelsekroken, och händelsekroken inaktiveras aldrig.

Händelsekrokfel och inaktivering av händelsekrokar kan övervakas i händelseloggen som hook_progress händelser.

Funktionen event hook måste vara en Python-funktion som accepterar exakt en parameter, en ordlisterepresentation av händelsen som utlöste den här händelsekroken. Alla returvärden från händelsekrokens funktion ignoreras.

Exempel: Välj specifika händelser för bearbetning

I följande exempel visas en händelsekrok som väljer specifika händelser för bearbetning. Mer specifikt väntar det här exemplet tills pipelinehändelser STOPPING tas emot och matar sedan ut ett meddelande till drivrutinsloggarna stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Exempel: Skicka alla händelser till en Slack-kanal

I följande exempel implementeras en händelsekrok som skickar alla händelser som tagits emot till en Slack-kanal med hjälp av Slack-API:et.

I det här exemplet används en Databricks-hemlighet för att lagra en token som krävs för att autentisera till Slack-API:et på ett säkert sätt.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Exempel: Konfigurera en händelsekrok för att inaktivera efter fyra på varandra följande fel

I följande exempel visas hur du konfigurerar en händelsekrok som är inaktiverad om den misslyckas fyra gånger i följd.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Exempel: En Delta Live Tables-pipeline med en händelsekrok

I följande exempel visas hur du lägger till en händelsekrok i källkoden för en pipeline. Det här är ett enkelt men komplett exempel på hur du använder händelsekrokar med en pipeline.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })