Definiowanie niestandardowego monitorowania potoków tabel delta Live Tables za pomocą punktów zaczepienia zdarzeń

Ważne

Obsługa punktów zaczepienia zdarzeń jest dostępna w publicznej wersji zapoznawczej.

Za pomocą punktów zaczepienia zdarzeń można dodawać niestandardowe funkcje wywołania zwrotnego języka Python, które są uruchamiane, gdy zdarzenia są utrwalane w dzienniku zdarzeń potoku delta Live Tables. Za pomocą punktów zaczepienia zdarzeń można zaimplementować niestandardowe rozwiązania do monitorowania i zgłaszania alertów. Można na przykład użyć punktów zaczepienia zdarzeń, aby wysyłać wiadomości e-mail lub zapisywać w dzienniku, gdy wystąpią określone zdarzenia lub zintegrować z rozwiązaniami innych firm w celu monitorowania zdarzeń potoku.

Element zaczepienia zdarzeń definiuje się za pomocą funkcji języka Python, która akceptuje jeden argument, gdzie argument jest słownikiem reprezentującym zdarzenie. Następnie dołączysz punkt zaczepienia zdarzeń jako część kodu źródłowego potoku. Wszelkie przypinania zdarzeń zdefiniowane w potoku będą podejmować próbę przetworzenia wszystkich zdarzeń generowanych podczas każdej aktualizacji potoku. Jeśli potok składa się z wielu artefaktów kodu źródłowego, na przykład wielu notesów, wszystkie zdefiniowane haki zdarzeń są stosowane do całego potoku. Mimo że w kodzie źródłowym potoku znajdują się haki zdarzeń, nie są one uwzględnione w grafie potoku.

Za pomocą punktów zaczepienia zdarzeń można używać potoków publikowanych w magazynie metadanych Programu Hive lub wykazie aparatu Unity.

Uwaga

  • Język Python jest jedynym językiem obsługiwanym do definiowania punktów zaczepienia zdarzeń.
  • Haki zdarzeń są wyzwalane tylko w przypadku zdarzeń, w których maturity_level to STABLE.
  • Zaczepienia zdarzeń są wykonywane asynchronicznie z aktualizacji potoku, ale synchronicznie z innymi punktami zaczepienia zdarzeń. Oznacza to, że tylko jeden punkt zaczepienia zdarzeń jest uruchamiany w danym momencie, a inne haki zdarzeń czekają na uruchomienie do momentu ukończenia aktualnie uruchomionego elementu zaczepienia zdarzeń. Jeśli punkt zaczepienia zdarzeń jest uruchamiany w nieskończoność, blokuje wszystkie inne zaczepienia zdarzeń.
  • Delta Live Tables próbuje uruchomić każdy punkt zaczepienia zdarzeń dla każdego zdarzenia emitowanego podczas aktualizacji potoku. Aby zapewnić, że zaczepienia zdarzeń opóźniające mają czas na przetworzenie wszystkich zdarzeń w kolejce, delty tabele na żywo oczekują na niekonfigurowalny stały okres przed zakończeniem obliczeń z uruchomionym potokiem. Nie ma jednak gwarancji, że wszystkie haki są wyzwalane na wszystkich zdarzeniach przed zakończeniem obliczeń.

Monitorowanie przetwarzania punktów zaczepienia zdarzeń

hook_progress Użyj typu zdarzenia w dzienniku zdarzeń delta Live Tables, aby monitorować stan punktów zaczepienia zdarzeń aktualizacji. Aby zapobiec zależnościom cyklicznym, haki zdarzeń nie są wyzwalane dla hook_progress zdarzeń.

Definiowanie elementu zaczepienia zdarzeń

Aby zdefiniować punkt zaczepienia zdarzeń, użyj dekoratora on_event_hook :

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

Opisuje max_allowable_consecutive_failures maksymalną liczbę kolejnych razy, gdy punkt zaczepienia zdarzeń może zakończyć się niepowodzeniem, zanim zostanie wyłączony. Awaria haka zdarzeń jest definiowana w dowolnym momencie, gdy punkt zaczepienia zdarzeń zgłasza wyjątek. Jeśli punkt zaczepienia zdarzeń jest wyłączony, nie przetwarza nowych zdarzeń do momentu ponownego uruchomienia potoku.

max_allowable_consecutive_failures musi być liczbą całkowitą większą lub równą 0 lub None. Wartość None (przypisana domyślnie) oznacza, że nie ma limitu liczby kolejnych niepowodzeń dozwolonych dla haka zdarzeń, a punkt zaczepienia zdarzeń nigdy nie jest wyłączony.

Błędy haka zdarzeń i wyłączanie punktów zaczepienia zdarzeń można monitorować w dzienniku zdarzeń jako hook_progress zdarzenia.

Funkcja haka zdarzeń musi być funkcją języka Python, która akceptuje dokładnie jeden parametr, czyli słownikową reprezentację zdarzenia, które wyzwoliło ten element zaczepienia zdarzeń. Każda wartość zwracana z funkcji punktu zaczepienia zdarzeń jest ignorowana.

Przykład: Wybieranie określonych zdarzeń do przetwarzania

W poniższym przykładzie pokazano punkt zaczepienia zdarzeń, który wybiera określone zdarzenia do przetwarzania. W szczególności ten przykład oczekuje na odebranie zdarzeń potoku STOPPING , a następnie wyprowadza komunikat do dzienników sterowników stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Przykład: wysyłanie wszystkich zdarzeń do kanału usługi Slack

W poniższym przykładzie zaimplementowano punkt zaczepienia zdarzeń, który wysyła wszystkie zdarzenia odebrane do kanału usługi Slack przy użyciu interfejsu API usługi Slack.

W tym przykładzie użyto wpisu tajnego usługi Databricks do bezpiecznego przechowywania tokenu wymaganego do uwierzytelnienia w interfejsie API usługi Slack.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Przykład: Konfigurowanie elementu zaczepienia zdarzeń w celu wyłączenia po czterech kolejnych awariach

W poniższym przykładzie pokazano, jak skonfigurować punkt zaczepienia zdarzeń, który jest wyłączony, jeśli niepowodzenie zakończy się niepowodzeniem cztery razy.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Przykład: potok delta live tables z hakiem zdarzeń

W poniższym przykładzie pokazano dodanie elementu zaczepienia zdarzeń do kodu źródłowego potoku. Jest to prosty, ale kompletny przykład użycia punktów zaczepienia zdarzeń z potokiem.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })