Share via


Definire il monitoraggio personalizzato delle pipeline di tabelle live Delta con hook di eventi

Importante

Il supporto per gli hook di eventi è disponibile in anteprima pubblica.

È possibile usare hook di eventi per aggiungere funzioni di callback Python personalizzate eseguite quando gli eventi vengono salvati in modo permanente nel registro eventi di una pipeline di Tabelle live Delta. È possibile usare hook di eventi per implementare soluzioni personalizzate di monitoraggio e avvisi. Ad esempio, è possibile usare hook di eventi per inviare messaggi di posta elettronica o scrivere in un log quando si verificano eventi specifici o per l'integrazione con soluzioni di terze parti per monitorare gli eventi della pipeline.

Si definisce un hook di eventi con una funzione Python che accetta un singolo argomento, dove l'argomento è un dizionario che rappresenta un evento. Si includono quindi gli hook di eventi come parte del codice sorgente per una pipeline. Qualsiasi hook di eventi definito in una pipeline tenterà di elaborare tutti gli eventi generati durante ogni aggiornamento della pipeline. Se la pipeline è costituita da più artefatti di codice sorgente, ad esempio più notebook, tutti gli hook di eventi definiti vengono applicati all'intera pipeline. Anche se gli hook di eventi sono inclusi nel codice sorgente per la pipeline, non sono inclusi nel grafico della pipeline.

È possibile usare hook di eventi con pipeline che pubblicano nel metastore Hive o nel catalogo Unity.

Nota

  • Python è l'unico linguaggio supportato per la definizione di hook di eventi.
  • Gli hook eventi vengono attivati solo per gli eventi in cui il maturity_level è STABLE.
  • Gli hook di eventi vengono eseguiti in modo asincrono dagli aggiornamenti della pipeline, ma in modo sincrono con altri hook di eventi. Ciò significa che viene eseguito solo un singolo hook di eventi alla volta e altri hook di eventi attendono l'esecuzione fino al completamento dell'hook eventi attualmente in esecuzione. Se un hook eventi viene eseguito per un periodo illimitato, blocca tutti gli altri hook di eventi.
  • Delta Live Tables tenta di eseguire ogni hook di eventi su ogni evento generato durante un aggiornamento della pipeline. Per garantire che gli hook di eventi di ritardo abbiano tempo per elaborare tutti gli eventi in coda, le tabelle live delta attendono un periodo fisso non configurabile prima di terminare il calcolo che esegue la pipeline. Tuttavia, non è garantito che tutti gli hook vengano attivati su tutti gli eventi prima che il calcolo venga terminato.

Monitorare l'elaborazione degli hook eventi

Usare il hook_progress tipo di evento nel registro eventi di Tabelle live Delta per monitorare lo stato degli hook di eventi di un aggiornamento. Per evitare dipendenze circolari, gli hook di eventi non vengono attivati per hook_progress gli eventi.

Definire un hook eventi

Per definire un hook di eventi, usare l'elemento on_event_hook Decorator:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

max_allowable_consecutive_failures Descrive il numero massimo di volte consecutive in cui un hook di eventi può avere esito negativo prima che venga disabilitato. Un errore di hook eventi viene definito come qualsiasi volta che l'hook eventi genera un'eccezione. Se un hook eventi è disabilitato, non elabora nuovi eventi fino al riavvio della pipeline.

max_allowable_consecutive_failures deve essere un numero intero maggiore o uguale a 0 o None. Un valore ( None assegnato per impostazione predefinita) indica che non esiste alcun limite al numero di errori consecutivi consentiti per l'hook eventi e l'hook eventi non è mai disabilitato.

Gli errori di hook degli eventi e la disabilitazione degli hook di eventi possono essere monitorati nel registro eventi come hook_progress eventi.

La funzione hook eventi deve essere una funzione Python che accetta esattamente un parametro, una rappresentazione del dizionario dell'evento che ha attivato questo hook eventi. Qualsiasi valore restituito dalla funzione hook eventi viene ignorato.

Esempio: Selezionare eventi specifici per l'elaborazione

Nell'esempio seguente viene illustrato un hook eventi che seleziona eventi specifici per l'elaborazione. In particolare, questo esempio attende fino a quando non vengono ricevuti gli eventi della pipeline STOPPING e quindi restituisce un messaggio al driver registra stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Esempio: Inviare tutti gli eventi a un canale Slack

L'esempio seguente implementa un hook eventi che invia tutti gli eventi ricevuti a un canale Slack usando l'API Slack.

Questo esempio usa un segreto databricks per archiviare in modo sicuro un token necessario per l'autenticazione all'API Slack.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Esempio: Configurare un hook eventi per disabilitare dopo quattro errori consecutivi

Nell'esempio seguente viene illustrato come configurare un hook eventi disabilitato se ha esito negativo consecutivamente quattro volte.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Esempio: una pipeline di tabelle live delta con un hook di eventi

Nell'esempio seguente viene illustrato l'aggiunta di un hook di eventi al codice sorgente per una pipeline. Si tratta di un esempio semplice ma completo dell'uso di hook di eventi con una pipeline.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })