Freigeben über


Benutzerdefinierte Überwachung von deklarativen Lakeflow-Pipelines mit Ereignis-Triggern definieren

Von Bedeutung

Die Unterstützung für Ereignishaken befindet sich in der öffentlichen Vorschau.

Sie können Ereignishaken verwenden, um benutzerdefinierte Python-Rückruffunktionen hinzuzufügen, die ausgeführt werden, wenn Ereignisse im Ereignisprotokoll einer Pipeline beibehalten werden. Sie können Ereignishaken verwenden, um benutzerdefinierte Überwachungs- und Warnlösungen zu implementieren. Sie können z. B. Ereignishaken verwenden, um E-Mails zu senden oder in ein Protokoll zu schreiben, wenn bestimmte Ereignisse auftreten oder in Lösungen von Drittanbietern integriert werden, um Pipelineereignisse zu überwachen.

Sie definieren einen Ereignishaken mit einer Python-Funktion, die ein einzelnes Argument akzeptiert, wobei das Argument ein Wörterbuch ist, das ein Ereignis darstellt. Anschließend fügen Sie die Ereignishaken als Teil des Quellcodes für eine Pipeline ein. Alle in einer Pipeline definierten Ereignishaken versuchen, alle Ereignisse zu verarbeiten, die während jeder Pipelineaktualisierung generiert werden. Wenn Ihre Pipeline aus mehreren Quellcodeartefakten besteht, z. B. aus mehreren Notizbüchern, werden alle definierten Ereignishaken auf die gesamte Pipeline angewendet. Obwohl Ereignishaken im Quellcode für Ihre Pipeline enthalten sind, sind sie nicht im Pipelinediagramm enthalten.

Sie können Ereignishooks mit Pipelines verwenden, die im Hive-Metastore oder Unity-Katalog veröffentlicht werden.

Hinweis

  • Python ist die einzige Sprache, die zum Definieren von Ereignishaken unterstützt wird. Um benutzerdefinierte Python-Funktionen zu definieren, die Ereignisse in einer Pipeline verarbeiten, die mithilfe der SQL-Schnittstelle implementiert wird, fügen Sie die benutzerdefinierten Funktionen in einem separaten Python-Notizbuch hinzu, das als Teil der Pipeline ausgeführt wird. Wenn die Pipeline läuft, werden die Python-Funktionen auf die gesamte Pipeline angewendet.
  • Ereignishaken werden nur für Ereignisse ausgelöst, bei denen das Maturity Level erfüllt ist STABLE.
  • Ereignis-Hooks werden asynchron von Pipeline-Updates ausgeführt, aber synchron mit anderen Ereignis-Hooks. Dies bedeutet, dass jeweils nur ein einzelner Ereignishaken ausgeführt wird, und andere Ereignishaken warten, bis der derzeit ausgeführte Ereignishaken abgeschlossen ist. Wenn ein Ereignishaken unbegrenzt ausgeführt wird, werden alle anderen Ereignishaken blockiert.
  • Lakeflow Declarative Pipelines versucht, jeden Ereignis-Hook für jedes Ereignis auszuführen, das während einer Pipeline-Aktualisierung ausgelöst wird. Um sicherzustellen, dass nachhinkende Ereignis-Hooks Zeit zum Verarbeiten aller in die Warteschlange gestellten Ereignisse haben, wartet Lakeflow Declarative Pipelines für einen nicht konfigurierbaren festen Zeitraum, bevor die Berechnungseinheit, die die Pipeline ausführt, beendet wird. Es ist jedoch nicht garantiert, dass alle Hooks für alle Ereignisse ausgelöst werden, bevor die Berechnung beendet wird.

Überwachen der Ereignishakenverarbeitung

Verwenden Sie den hook_progress Ereignistyp im Ereignisprotokoll "Lakeflow Declarative Pipelines", um den Status der Ereignishaken eines Updates zu überwachen. Um Zirkelabhängigkeiten zu verhindern, werden Ereignishooks für hook_progress Ereignisse nicht ausgelöst.

Definieren eines Ereignishakens

Verwenden Sie zum Definieren eines Ereignishakens den on_event_hook Dekorator:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

max_allowable_consecutive_failures beschreibt die maximale Anzahl aufeinanderfolgender Fehlversuche eines Ereignishakens, bevor er deaktiviert wird. Ein Ereignis-Hook-Fehler tritt jedes Mal auf, wenn die Ereignis-Hook-Funktion eine Ausnahme auslöst. Wenn ein Ereignishaken deaktiviert ist, werden erst neue Ereignisse verarbeitet, wenn die Pipeline neu gestartet wird.

max_allowable_consecutive_failures muss eine ganze Zahl sein, die größer oder gleich 0 oder None ist. Ein Wert von None (standardmäßig zugewiesen) bedeutet, dass es keine Beschränkung auf die Anzahl der für den Ereignishaken zulässigen aufeinanderfolgenden Fehler gibt, und der Ereignishaken wird nie deaktiviert.

Fehler bei Ereignishooks und die Deaktivierung von Ereignishooks können im Ereignisprotokoll als hook_progress-Ereignisse überwacht werden.

Die Ereignis-Hook-Funktion muss eine Python-Funktion sein, die genau einen Parameter akzeptiert, eine Wörterbuchdarstellung des Ereignisses, das diesen Ereignishaken ausgelöst hat. Jeder Rückgabewert aus der Ereignishakenfunktion wird ignoriert.

Beispiel: Auswählen bestimmter Ereignisse für die Verarbeitung

Das folgende Beispiel veranschaulicht einen Ereignishaken, der bestimmte Ereignisse für die Verarbeitung auswählt. Insbesondere wartet dieses Beispiel, bis Pipelineereignisse STOPPING empfangen werden, und gibt dann eine Nachricht in die Treiberprotokolle stdoutaus.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Beispiel: Senden aller Ereignisse an einen Slack-Kanal

Im folgenden Beispiel wird ein Ereignishaken implementiert, der alle Ereignisse sendet, die mit der Slack-API an einen Slack-Kanal empfangen werden.

In diesem Beispiel wird ein Databricks-Geheimnis verwendet, um ein Token sicher zu speichern, das für die Authentifizierung bei der Slack-API erforderlich ist.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Beispiel: Konfigurieren eines Ereignishakens zum Deaktivieren nach vier aufeinander folgenden Fehlern

Im folgenden Beispiel wird veranschaulicht, wie Sie einen Ereignishaken konfigurieren, der deaktiviert wird, wenn er vier Mal hintereinander fehlschlägt.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Beispiel: Lakeflow Declarative Pipelines mit einem Ereignishaken

Das folgende Beispiel veranschaulicht das Hinzufügen eines Ereignishakens zum Quellcode für eine Pipeline. Dies ist ein einfaches, aber vollständiges Beispiel für die Verwendung von Ereignishaken mit einer Pipeline.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })