Teilen über


Definieren der benutzerdefinierten Überwachung von Pipelines mit Ereignishaken

Von Bedeutung

Die Unterstützung für Ereignishaken befindet sich in der öffentlichen Vorschau.

Sie können Ereignishaken verwenden, um benutzerdefinierte Python-Rückruffunktionen hinzuzufügen, die ausgeführt werden, wenn Ereignisse im Ereignisprotokoll einer Pipeline beibehalten werden. Sie können Ereignishaken verwenden, um benutzerdefinierte Überwachungs- und Warnlösungen zu implementieren. Sie können z. B. Ereignishaken verwenden, um E-Mails zu senden oder in ein Protokoll zu schreiben, wenn bestimmte Ereignisse auftreten oder in Lösungen von Drittanbietern integriert werden, um Pipelineereignisse zu überwachen.

Definieren Sie einen Ereignishaken mit einer Python-Funktion, die ein einzelnes Argument akzeptiert, wobei das Argument ein Wörterbuch ist, das ein Ereignis darstellt. Schließen Sie dann die Ereignis-Hooks als Teil des Quellcodes für eine Pipeline ein. Alle in einer Pipeline definierten Ereignishaken versuchen, alle Ereignisse zu verarbeiten, die während jeder Pipelineaktualisierung generiert werden. Wenn Ihre Pipeline aus mehreren Quellcodedateien besteht, werden alle definierten Ereignishaken auf die gesamte Pipeline angewendet. Obwohl Ereignishaken im Quellcode für Ihre Pipeline enthalten sind, sind sie nicht im Pipelinediagramm enthalten.

Sie können Ereignishooks mit Pipelines verwenden, die im Hive-Metastore oder in Unity Catalog veröffentlichen.

Hinweis

  • Python ist die einzige Sprache, die zum Definieren von Ereignishaken unterstützt wird. Um benutzerdefinierte Python-Funktionen zu definieren, die Ereignisse in einer Pipeline verarbeiten, die mithilfe der SQL-Schnittstelle implementiert wird, fügen Sie die benutzerdefinierten Funktionen in einer separaten Python-Quelldatei hinzu, die als Teil der Pipeline ausgeführt wird. Wenn die Pipeline läuft, werden die Python-Funktionen auf die gesamte Pipeline angewendet.
  • Ereignishooks werden nur für Ereignisse ausgelöst, bei denen maturity_level auf STABLE festgelegt ist.
  • Ereignishooks werden asynchron über Pipelineupdates ausgeführt, jedoch synchron mit anderen Ereignishooks. Dies bedeutet, dass jeweils nur ein einzelner Ereignishaken ausgeführt wird, und andere Ereignishaken warten, bis der derzeit ausgeführte Ereignishaken abgeschlossen ist. Wenn ein Ereignishaken unbegrenzt ausgeführt wird, werden alle anderen Ereignishaken blockiert.
  • Lakeflow Spark Declarative Pipelines (SDP) versucht, jeden Ereignis-Hook für jedes Ereignis auszuführen, das während eines Pipelineupdates ausgegeben wird. Um sicherzustellen, dass verzögerte Ereignishaken Zeit haben, alle in die Warteschlange gestellten Ereignisse zu verarbeiten, wartet SDP einen nicht konfigurierbaren festen Zeitraum ab, bevor die Recheneinheit, die die Pipeline ausführt, beendet wird. Es ist jedoch nicht garantiert, dass alle Hooks für alle Ereignisse ausgelöst werden, bevor die Berechnung beendet wird.

Überwachen der Verarbeitung von Ereignishooks

Verwenden Sie den hook_progress Ereignistyp im Pipelineereignisprotokoll, um den Status der Ereignishaken eines Updates zu überwachen. Um Zirkelabhängigkeiten zu verhindern, werden Ereignishooks für hook_progress Ereignisse nicht ausgelöst.

Definieren eines Ereignishooks

Verwenden Sie zum Definieren eines Ereignishakens den on_event_hook Dekorator:

@dp.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

max_allowable_consecutive_failures beschreibt die maximale Anzahl aufeinanderfolgender Fehlversuche eines Ereignishakens, bevor er deaktiviert wird. Definitionsgemäß tritt ein Ereignishookfehler immer dann auf, wenn der Ereignishook eine Ausnahme auslöst. Wenn ein Ereignishaken deaktiviert ist, werden erst neue Ereignisse verarbeitet, wenn die Pipeline neu gestartet wird.

max_allowable_consecutive_failures muss eine ganze Zahl sein, die größer oder gleich 0 oder None ist. Ein Wert von None (standardmäßig zugewiesen) bedeutet, dass es keine Beschränkung auf die Anzahl der für den Ereignishaken zulässigen aufeinanderfolgenden Fehler gibt, und der Ereignishaken wird nie deaktiviert.

Fehler bei Ereignishooks und die Deaktivierung von Ereignishooks können im Ereignisprotokoll als hook_progress-Ereignisse überwacht werden.

Die Ereignis-Hook-Funktion muss eine Python-Funktion sein, die genau einen Parameter akzeptiert, eine Wörterbuchdarstellung des Ereignisses, das diesen Ereignishaken ausgelöst hat. Jeder Rückgabewert aus der Ereignishakenfunktion wird ignoriert.

Beispiel: Auswählen bestimmter Ereignisse für die Verarbeitung

Das folgende Beispiel veranschaulicht einen Ereignishaken, der bestimmte Ereignisse für die Verarbeitung auswählt. In diesem Beispiel wartet der Ereignishook insbesondere, bis STOPPING-Ereignisse von der Pipeline empfangen werden, bevor eine Nachricht an die stdout-Treiberprotokolle ausgegeben wird.

@dp.on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Beispiel: Senden aller Ereignisse an einen Slack-Kanal

Im folgenden Beispiel wird ein Ereignishaken implementiert, der alle Ereignisse sendet, die mit der Slack-API an einen Slack-Kanal empfangen werden.

In diesem Beispiel wird ein Databricks-Geheimnis verwendet, um ein Token sicher zu speichern, das für die Authentifizierung bei der Slack-API erforderlich ist.

from pyspark import pipelines as dp
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@dp.on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Beispiel: Konfigurieren eines Ereignishakens zum Deaktivieren nach vier aufeinander folgenden Fehlern

Im folgenden Beispiel wird veranschaulicht, wie Sie einen Ereignishaken konfigurieren, der deaktiviert wird, wenn er vier Mal hintereinander fehlschlägt.

from pyspark import pipelines as dp
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@dp.on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Beispiel: Pipeline mit einem Ereignishaken

Das folgende Beispiel veranschaulicht das Hinzufügen eines Ereignishakens zum Quellcode für eine Pipeline. Dies ist ein einfaches, aber vollständiges Beispiel für die Verwendung von Ereignishaken mit einer Pipeline.

from pyspark import pipelines as dp
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@dp.table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@dp.on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })