Freigeben über


Definieren der benutzerdefinierten Überwachung von Delta Live Tables-Pipelines mit Ereignishooks

Wichtig

Die Unterstützung für Ereignishooks befindet sich in Public Preview.

Sie können Ereignishooks verwenden, um benutzerdefinierte Python-Rückruffunktionen hinzuzufügen, die ausgeführt werden, wenn Ereignisse in einem Ereignisprotokoll einer Delta Live Tables-Pipeline gespeichert werden. Sie können Ereignishooks verwenden, um benutzerdefinierte Überwachungs- und Warnungslösungen zu implementieren. Sie können Ereignishooks beispielsweise verwenden, um E-Mails zu senden oder einen Schreibvorgang in ein Protokoll zu starten, wenn bestimmte Ereignisse auftreten, oder sie in Lösungen von Drittanbietern integrieren, um Pipelineereignisse zu überwachen.

Sie definieren einen Ereignishook mit einer Python-Funktion, die ein einzelnes Argument akzeptiert, wobei dieses Argument ein Wörterbuch ist, das ein Ereignis darstellt. Anschließend fügen Sie die Ereignishooks als Teil des Quellcodes für eine Pipeline ein. Sämtliche in einer Pipeline definierten Ereignishooks werden versuchen, alle Ereignisse zu verarbeiten, die während jedes Pipelineupdates generiert werden. Wenn Ihre Pipeline aus mehreren Quellcodeartefakten besteht, z. B. aus mehreren Notebooks, werden alle definierten Ereignishooks auf die gesamte Pipeline angewendet. Obwohl Ereignishooks im Quellcode für Ihre Pipeline enthalten sind, sind sie nicht im Pipelinediagramm enthalten.

Sie können Ereignishooks mit Pipelines verwenden, die im Hive-Metastore oder im Unity-Katalog veröffentlichen.

Hinweis

  • Python ist die einzige Sprache, die zum Definieren von Ereignishooks unterstützt wird. Um benutzerdefinierte Python-Funktionen zu definieren, die Ereignisse in einer Pipeline verarbeiten, die mithilfe der SQL-Schnittstelle implementiert wird, fügen Sie die benutzerdefinierten Funktionen in einem separaten Python-Notizbuch hinzu, das als Teil der Pipeline ausgeführt wird. Die Python-Funktionen werden auf die gesamte Pipeline angewendet, wenn die Pipeline ausgeführt wird.
  • Ereignishooks werden nur für Ereignisse ausgelöst, bei denen der maturity_level STABLE ist.
  • Ereignishooks werden asynchron über Pipelineupdates ausgeführt, jedoch synchron mit anderen Ereignishooks. Das bedeutet, dass jeweils nur ein einzelner Ereignishook ausgeführt wird, und andere Ereignishooks warten, bis der derzeit ausgeführte abgeschlossen ist. Wenn ein Ereignishook unbegrenzt ausgeführt wird, werden alle anderen Ereignishooks blockiert.
  • Delta Live Tables versucht, jeden Ereignishook für jedes Ereignis auszuführen, das während eines Pipelineupdates ausgegeben wird. Um sicherzustellen, dass Ereignishooks mit Verzögerungen Zeit zum Verarbeiten aller sich in der Warteschlange befindlichen Ereignisse haben, wartet Delta Live Tables einen nicht konfigurierbaren festen Zeitraum ab, bevor die Berechnung beendet wird, die die Pipeline ausführt. Es ist jedoch nicht garantiert, dass alle Hooks für alle Ereignisse ausgelöst werden, bevor die Berechnung beendet wird.

Überwachen der Verarbeitung von Ereignishooks

Verwenden Sie den Ereignistyp hook_progress im Delta Live Tables-Ereignisprotokoll, um den Status eines Updates eines Ereignishooks zu überwachen. Um Zirkelabhängigkeiten zu verhindern, werden Ereignishooks für hook_progress-Ereignisse nicht ausgelöst.

Definieren eines Ereignishooks

Verwenden Sie zum Definieren eines Ereignishooks den on_event_hook-Decorator:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

Der Wert von max_allowable_consecutive_failures gibt die maximale Anzahl aufeinanderfolgender Male an, die ein Ereigniskook fehlschlagen kann, bevor er deaktiviert wird. Definitionsgemäß tritt ein Ereignishookfehler immer dann auf, wenn der Ereignishook eine Ausnahme auslöst. Wenn ein Ereignishook deaktiviert ist, verarbeitet er neue Ereignisse erst dann, wenn die Pipeline neu gestartet wird.

max_allowable_consecutive_failures muss eine ganze Zahl größer oder gleich 0 oder None sein. Ein Wert von None (standardmäßig zugewiesen) bedeutet, dass es keine Grenze für die Anzahl an für den Ereignishook zulässigen aufeinanderfolgenden Fehler gibt, und der Ereignishook somit nie deaktiviert wird.

Ereignishookfehler und die Deaktivierung von Ereignishooks können im Ereignisprotokoll als hook_progress-Ereignisse überwacht werden.

Die Ereignishookfunktion muss eine Python-Funktion sein, die genau einen Parameter akzeptiert, eine Wörterbuchdarstellung des Ereignisses, das diesen Ereignishook ausgelöst hat. Jeder Rückgabewert aus der Ereignishookfunktion wird ignoriert.

Beispiel: Auswählen bestimmter Ereignisse für die Verarbeitung

Im folgenden Beispiel wird ein Ereignishook demonstriert, der bestimmte Ereignisse für die Verarbeitung auswählt. In diesem Beispiel wartet der Ereignishook insbesondere, bis STOPPING-Ereignisse von der Pipeline empfangen werden, bevor eine Nachricht an die stdout-Treiberprotokolle ausgegeben wird.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Beispiel: Senden aller Ereignisse an einen Slack-Kanal

Im folgenden Beispiel wird ein Ereignishook implementiert, der alle Ereignisse sendet, die mit der Slack-API an einen Slack-Kanal empfangen werden.

In diesem Beispiel wird ein Databricks-Geheimnis verwendet, um ein Token sicher zu speichern, das für die Authentifizierung bei der Slack-API erforderlich ist.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Beispiel: Konfigurieren eines Ereignishooks zum Deaktivieren nach vier aufeinanderfolgenden Fehlern

Im folgenden Beispiel wird veranschaulicht, wie Sie einen Ereignishook konfigurieren, der deaktiviert wird, wenn er viermal hintereinander einen Fehler erzeugt.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Beispiel: Eine Delta Live Tables-Pipeline mit einem Ereignishook

Das folgende Beispiel veranschaulicht das Hinzufügen eines Ereignisooks zum Quellcode für eine Pipeline. Dies ist ein einfaches, aber vollständiges Beispiel für die Verwendung von Ereignishooks mit einer Pipeline.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })