Поделиться через


Определение пользовательского мониторинга конвейеров Delta Live Tables с помощью перехватчиков событий

Внимание

Поддержка перехватчиков событий доступна в общедоступной предварительной версии.

Можно использовать перехватчики событий для добавления пользовательских функций обратного вызова Python, которые выполняются при сохранении событий в журнал событий конвейера delta Live Tables. Вы можете использовать перехватчики событий для реализации пользовательских решений мониторинга и оповещений. Например, можно использовать перехватчики событий для отправки сообщений электронной почты или записи в журнал при возникновении определенных событий или интеграции с сторонними решениями для мониторинга событий конвейера.

Вы определяете перехватчик событий с функцией Python, которая принимает один аргумент, где аргумент является словарем, представляющим событие. Затем вы включаете перехватчики событий в состав исходного кода для конвейера. Все перехватчики событий, определенные в конвейере, попытаются обработать все события, созданные во время каждого обновления конвейера. Если конвейер состоит из нескольких артефактов исходного кода, например нескольких записных книжек, к всему конвейеру применяются все определенные перехватчики событий. Хотя перехватчики событий включены в исходный код для конвейера, они не включены в граф конвейера.

Вы можете использовать перехватчики событий с конвейерами, которые публикуются в хранилище метаданных Hive или каталоге Unity.

Примечание.

  • Python — единственный язык, поддерживаемый для определения перехватчиков событий.
  • Перехватчики событий активируются только для событий, в которых maturity_levelSTABLE.
  • Перехватчики событий выполняются асинхронно из обновлений конвейера, но синхронно с другими перехватчиками событий. Это означает, что одновременно выполняется только один перехватчик событий, а другие перехватчики событий ожидают выполнения до завершения перехватчика событий в данный момент. Если перехватчик событий выполняется неограниченное время, он блокирует все остальные перехватчики событий.
  • Разностные динамические таблицы пытаются запустить каждый перехват событий во время обновления конвейера. Чтобы убедиться, что перехватчики событий заметок имеют время обрабатывать все события очереди, Delta Live Tables ожидает не настраиваемого фиксированного периода перед завершением вычислений, выполняющих конвейер. Однако не гарантируется, что все перехватчики активируются во всех событиях до завершения вычисления.

Мониторинг обработки перехватчика событий

hook_progress Используйте тип события в журнале событий Delta Live Tables для отслеживания состояния перехватчиков событий обновления. Чтобы предотвратить циклические зависимости, перехватчики событий не активируются для hook_progress событий.

Определение перехватчика событий

Чтобы определить перехватчик событий, используйте on_event_hook декоратор:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

В этом max_allowable_consecutive_failures разделе описывается максимальное число последовательных попыток перехватчика событий, прежде чем оно отключено. Сбой перехватчика событий определяется в любой момент, когда перехватчик событий создает исключение. Если перехватчик событий отключен, он не обрабатывает новые события до перезапуска конвейера.

max_allowable_consecutive_failures должно быть целым числом, превышающим или равным 0 или Noneравным. Значение None (назначенное по умолчанию) означает, что число последовательных сбоев, разрешенных для перехватчика событий, не ограничено, а перехватчик событий никогда не отключается.

Сбои перехвата событий и отключение перехватчиков событий можно отслеживать в журнале событий как hook_progress события.

Функция перехватчика событий должна быть функцией Python, которая принимает ровно один параметр, словарное представление события, которое активировало этот перехватчик событий. Любое возвращаемое значение функции перехватчика событий игнорируется.

Пример. Выбор определенных событий для обработки

В следующем примере показан перехватчик событий, который выбирает определенные события для обработки. В частности, в этом примере ожидается получение событий конвейера STOPPING , а затем выводится сообщение в журналы stdoutдрайверов.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Пример. Отправка всех событий в канал Slack

В следующем примере реализован перехватчик событий, который отправляет все события, полученные в канал Slack с помощью API Slack.

В этом примере используется секрет Databricks для безопасного хранения маркера, необходимого для проверки подлинности в API Slack.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Пример. Настройка перехватчика событий для отключения после четырех последовательных сбоев

В следующем примере показано, как настроить перехватчик событий, который отключен, если он завершается последовательно четыре раза.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Пример: конвейер Delta Live Tables с перехватчиком события

В следующем примере показано добавление перехватчика событий в исходный код конвейера. Это простой, но полный пример использования перехватчиков событий с конвейером.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })