Von Bedeutung
イベント フックのサポートは パブリック プレビュー段階です。
イベント フックを使用して、イベントがパイプラインのイベント ログに保持されるときに実行されるカスタム Python コールバック関数を追加できます。 イベント フックを使用して、カスタム監視およびアラート ソリューションを実装できます。 たとえば、イベント フックを使用して、特定のイベントが発生したときに電子メールを送信したり、ログに書き込んだりしたり、パイプライン イベントを監視するためにサードパーティソリューションと統合したりできます。
1 つの引数を受け取る Python 関数を使用してイベント フックを定義します。引数はイベントを表すディクショナリです。 その後、パイプラインのソース コードの一部としてイベント フックを含めます。 パイプラインで定義されているイベント フックは、各パイプラインの更新中に生成されたすべてのイベントの処理を試みます。 パイプラインが複数のソース コード成果物 (たとえば、複数のノートブック) で構成されている場合は、定義されたイベント フックがパイプライン全体に適用されます。 イベント フックはパイプラインのソース コードに含まれていますが、パイプライン グラフには含まれません。
Hive メタストアまたは Unity カタログに発行するパイプラインでイベント フックを使用できます。
注
- Python は、イベント フックを定義するためにサポートされている唯一の言語です。 SQL インターフェイスを使用して実装されたパイプライン内のイベントを処理するカスタム Python 関数を定義するには、パイプラインの一部として実行される別の Python ノートブックにカスタム関数を追加します。 Python 関数は、パイプラインの実行時にパイプライン全体に適用されます。
- イベント フックは、イベントに対してのみトリガーされます。
- イベント フックは、パイプラインの更新から非同期的に実行されますが、他のイベント フックと同期的に実行されます。 つまり、一度に実行されるイベント フックは 1 つだけであり、他のイベント フックは現在実行中のイベント フックが完了するまで実行を待機します。 イベント フックが無期限に実行されると、他のすべてのイベント フックがブロックされます。
- Lakeflow 宣言型パイプラインは、パイプラインの更新中に生成されたすべてのイベントに対して、各イベント フックの実行を試みます。 遅延イベント フックがキューに登録されたすべてのイベントを処理する時間を確保するために、Lakeflow 宣言パイプラインは、パイプラインを実行しているコンピューティングを終了する前に、構成不可能な固定期間を待機します。 ただし、コンピューティングが終了する前にすべてのイベントですべてのフックがトリガーされるとは限りません。
イベント フック処理を監視する
更新プログラムのイベント フックの状態を監視するには、Lakeflow 宣言パイプライン イベント ログの hook_progress
イベントの種類を使用します。 循環依存関係を防ぐために、イベントフックは hook_progress
イベントに対してトリガーされません。
イベント フックを定義する
イベント フックを定義するには、 on_event_hook
デコレーターを使用します。
@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
max_allowable_consecutive_failures
は、イベント フックが無効になる前に失敗する可能性がある連続した回数の最大数を示します。 イベント フックエラーは、イベント フックが例外をスローするたびに定義されます。 イベント フックが無効になっている場合、パイプラインが再起動されるまで新しいイベントは処理されません。
max_allowable_consecutive_failures
は、 0
または None
以上の整数である必要があります。
None
の値 (既定で割り当てられます) は、イベント フックに対して許可される連続する障害の数に制限がないことを意味し、イベント フックは無効にされません。
イベント フックの失敗とイベント フックの無効化は、イベント ログで hook_progress
イベントとして監視できます。
イベント フック関数は、1 つのパラメーターを受け取る Python 関数である必要があります。これは、このイベント フックをトリガーしたイベントのディクショナリ表現です。 イベント フック関数からの戻り値は無視されます。
例: 処理する特定のイベントを選択する
次の例は、処理する特定のイベントを選択するイベント フックを示しています。 具体的には、この例では、パイプライン STOPPING
イベントが受信されるまで待機し、 stdout
ドライバー ログにメッセージを出力します。
@on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
例: Slack チャネルにすべてのイベントを送信する
次の例では、Slack API を使用して Slack チャネルに受信したすべてのイベントを送信するイベント フックを実装します。
この例では、Databricks シークレット を使用して、Slack API に対する認証に必要なトークンを安全に格納します。
from dlt import on_event_hook
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
例: 4 回連続して失敗した後に無効にするようにイベント フックを構成する
次の例では、連続して 4 回失敗した場合に無効になるイベント フックを構成する方法を示します。
from dlt import on_event_hook
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
例: イベント フックを使用した Lakeflow 宣言パイプライン
次の例では、パイプラインのソース コードにイベント フックを追加する方法を示します。 これは、パイプラインでイベント フックを使用する単純で完全な例です。
from dlt import table, on_event_hook, read
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})