Share via


イベント フックを使用して Delta Live Tables パイプラインのカスタム監視を定義する

重要

イベント フックのサポートは、パブリック プレビュー段階です。

イベント フックを使って、イベントが Delta Live Tables パイプラインのイベント ログに保存されるときに実行されるカスタム Python コールバック関数を追加できます。 イベント フックを使って、カスタム監視とアラートのソリューションを実装できます。 たとえば、イベント フックを使って、特定のイベントが発生したときにメールの送信やログへの書き込みを行ったり、パイプライン イベントを監視するためのサード パーティ ソリューションと統合したりできます。

イベント フックは、1 つの引数を受け取る Python 関数を使って定義します。この引数はイベントを表すディクショナリです。 その後、パイプラインのソース コードの一部としてイベント フックを含めます。 パイプラインで定義されているイベント フックは、各パイプラインの更新中に生成されたすべてのイベントの処理を試みます。 パイプラインが複数のソース コード成果物 (たとえば、複数のノートブック) で構成されている場合、定義されているイベント フックはパイプライン全体に適用されます。 イベント フックは、パイプラインのソース コードには含まれていますが、パイプライン グラフには含まれていません。

イベント フックは、Hive メタストアまたは Unity Catalog に発行されるパイプラインで使用できます。

Note

  • イベント フックを定義するためにサポートされている言語は Python だけです。
  • イベント フックは、maturity_levelSTABLE であるイベントに対してのみトリガーされます。
  • イベント フックは、パイプラインの更新からは非同期に実行されますが、他のイベント フックとは同期的に実行されます。 これは、一度に実行されるイベント フックは 1 つだけであり、他のイベント フックは現在実行中のイベント フックが完了するまで実行を待機することを意味します。 1 つのイベント フックがいつまでも実行していると、他のイベント フックはすべてブロックされます。
  • Delta Live Tables は、パイプラインの更新の間に生成されるすべてのイベントに対して、各イベント フックの実行を試みます。 遅延しているイベント フックがキュー内のすべてのイベントを処理する時間を確実に得られるよう、Delta Live Tables は構成できない固定の時間だけ待機してから、パイプラインを実行しているコンピューティングを終了します。 ただし、コンピューティングが終了する前にすべてのイベントに対してすべてのフックがトリガーされることは保証されません。

イベント フックの処理を監視する

更新のイベント フックの状態を監視するには、Delta Live Tables イベント ログの hook_progress イベントの種類を使います。 循環依存関係を防ぐため、hook_progress イベントに対してはイベント フックはトリガーされません。

イベント フックを定義する

イベント フックを定義するには、on_event_hook デコレーターを使います。

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

max_allowable_consecutive_failures では、イベント フックを無効になる前にイベント フックが連続して失敗できる最大回数を指定します。 イベント フックの失敗は、イベント フックが例外をスローするすべてのときと定義されます。 無効にされたイベント フックは、パイプラインが再起動されるまで、新しいイベントを処理しません。

max_allowable_consecutive_failures は、0 以上の整数または None にする必要があります。 値 None (既定で割り当てられます) は、イベント フックに対して許容される連続した失敗の数に制限がないことを意味し、イベント フックは無効にされません。

イベント フックの失敗とイベント フックの無効化は、イベント ログで hook_progress イベントとして監視できます。

イベント フック関数は、厳密に 1 つだけパラメーターを受け取る Python 関数である必要があります。このパラメーターは、このイベント フックをトリガーしたイベントのディクショナリ表現です。 イベント フック関数からの戻り値はすべて無視されます。

例: 処理する特定のイベントを選択する

次の例では、処理対象として特定のイベントを選ぶイベント フックを示します。 具体的には、この例では、パイプラインの STOPPING イベントを受け取るまで待機した後、ドライバー ログ stdout にメッセージを出力します。

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

例: すべてのイベントを Slack チャネルに送信する

次の例では、Slack API を使って、受信したすべてのイベントを Slack チャネルに送信するイベント フックを実装します。

この例では、Databricks のシークレットを使って、Slack API への認証に必要なトークンを安全に格納します。

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

例: 4 回連続して失敗したら無効になるようにイベント フックを構成する

次の例では、連続して 4 回失敗したら無効にされるイベント フックを構成する方法を示します。

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

例: イベント フックを使用する Delta Live Tables パイプライン

次の例では、パイプラインのソース コードにイベント フックを追加する方法を示します。 これは、パイプラインでイベント フックを使用する簡単ですが完全な例です。

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })