Définir une surveillance personnalisée des pipelines Delta Live Tables avec des hooks d’événement
Important
La prise en charge des crochets d'événement est en version préliminaire publique.
Vous pouvez utiliser des crochets d'événement pour ajouter des fonctions de rappel Python personnalisées qui s'exécutent lorsque les événements sont conservés dans le journal des événements d'un pipeline Delta Live Tables. Vous pouvez utiliser des crochets d’événement pour implémenter des solutions personnalisées de surveillance et d’alerte. Par exemple, vous pouvez utiliser des crochets d'événement pour envoyer des e-mails ou écrire dans un journal lorsque des événements spécifiques se produisent ou pour intégrer des solutions tierces pour surveiller les événements de pipeline.
Vous définissez un crochet d'événement avec une fonction Python qui accepte un seul argument, où l'argument est un dictionnaire représentant un événement. Vous incluez ensuite les crochets d’événement dans le code source d’un pipeline. Tous les crochets d'événement définis dans un pipeline tenteront de traiter tous les événements générés lors de chaque mise à jour du pipeline. Si votre pipeline est composé de plusieurs artefacts de code source, par exemple plusieurs notebooks, tous les crochets d'événement définis sont appliqués à l'ensemble du pipeline. Bien que les crochets d'événement soient inclus dans le code source de votre pipeline, ils ne sont pas inclus dans le graphique du pipeline.
Vous pouvez utiliser des crochets d'événement avec des pipelines qui publient sur le metastore Hive ou sur le catalogue Unity.
Remarque
- Python est le seul langage pris en charge pour définir des crochets d'événements. Pour définir des fonctions Python personnalisées qui traitent des événements dans un pipeline implémenté à l’aide de l’interface SQL, ajoutez les fonctions personnalisées dans un bloc-notes Python distinct qui s’exécute dans le cadre du pipeline. Les fonctions Python sont appliquées à l’ensemble du pipeline lors de l’exécution du pipeline.
- Les crochets d'événement sont déclenchés uniquement pour les événements dont le maturity_level est
STABLE
. - Les crochets d'événements sont exécutés de manière asynchrone à partir des mises à jour du pipeline, mais de manière synchrone avec d'autres crochets d'événements. Cela signifie qu'un seul crochet d'événement s'exécute à la fois et que les autres crochets d'événement attendent de s'exécuter jusqu'à ce que le crochet d'événement en cours d'exécution se termine. Si un crochet d'événement s'exécute indéfiniment, il bloque tous les autres crochets d'événement.
- Delta Live Tables tente d'exécuter chaque crochet d'événement sur chaque événement émis lors d'une mise à jour du pipeline. Pour garantir que les crochets d'événements en retard ont le temps de traiter tous les événements en file d'attente, Delta Live Tables attend une période fixe non configurable avant de mettre fin au calcul exécutant le pipeline. Cependant, il n'est pas garanti que tous les crochets soient déclenchés sur tous les événements avant la fin du calcul.
Surveiller le traitement des crochets d'événements
Utilisez le type d'événement hook_progress
dans le journal des événements Delta Live Tables pour surveiller l'état des crochets d'événement d'une mise à jour. Pour éviter les dépendances circulaires, les crochets d'événements ne sont pas déclenchés pour les événements hook_progress
.
Définir un crochet d'événement
Pour définir un crochet d'événement, utilisez le décorateur on_event_hook
:
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
Le max_allowable_consecutive_failures
décrit le nombre maximum de fois consécutives qu'un crochet d'événement peut échouer avant d'être désactivé. Un échec du crochet d’événement est défini à chaque fois que le crochet d’événement lève une exception. Si un crochet d'événement est désactivé, il ne traite pas les nouveaux événements tant que le pipeline n'est pas redémarré.
max_allowable_consecutive_failures
doit être un entier supérieur ou égal à 0
ou None
. Une valeur de None
(attribuée par défaut) signifie qu’il n’y a aucune limite au nombre d’échecs consécutifs autorisés pour le crochet d’événement, et que le crochet d’événement n’est jamais désactivé.
Les échecs des crochets d’événements et la désactivation des crochets d’événements peuvent être surveillés dans le journal des événements en tant qu’événements hook_progress
.
La fonction de crochet d'événement doit être une fonction Python qui accepte exactement un paramètre, une représentation par dictionnaire de l'événement qui a déclenché ce crochet d'événement. Toute valeur de retour de la fonction de crochet d'événement est ignorée.
Exemple : sélectionner des événements spécifiques à traiter
L'exemple suivant illustre un crochet d'événement qui sélectionne des événements spécifiques à traiter. Plus précisément, cet exemple attend que les événements STOPPING
du pipeline soient reçus, puis génère un message dans les journaux du pilote stdout
.
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Exemple : envoyer tous les événements à une chaîne Slack
L'exemple suivant implémente un crochet d'événement qui envoie tous les événements reçus à un canal Slack à l'aide de l'API Slack.
Cet exemple utilise un secret Databricks pour stocker en toute sécurité un jeton requis pour s'authentifier auprès de l'API Slack.
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Exemple : configurer un crochet d'événement pour le désactiver après quatre échecs consécutifs
L'exemple suivant montre comment configurer un crochet d'événement qui est désactivé s'il échoue quatre fois consécutivement.
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Exemple : un pipeline Delta Live Tables avec un crochet d'événement
L'exemple suivant montre l'ajout d'un crochet d'événement au code source d'un pipeline. Il s'agit d'un exemple simple mais complet d'utilisation de crochets d'événement avec un pipeline.
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})