Share via


使用事件攔截定義 Delta Live Tables 管線的自定義監視

重要

事件攔截的支持處於 公開預覽狀態

您可以使用 事件攔截 來新增自定義 Python 回呼函式,以在事件保存至 Delta Live Tables 管線 的事件記錄檔時執行。 您可以使用事件攔截來實作自定義監視和警示解決方案。 例如,您可以使用事件攔截在發生特定事件時傳送電子郵件或寫入記錄檔,或與第三方解決方案整合,以監視管線事件。

您可以使用接受單一自變數的 Python 函式定義事件攔截,其中自變數是代表事件的字典。 接著,您會在管線的原始程式碼中包含事件攔截。 管線中定義的任何事件攔截都會嘗試處理每個管線更新期間所產生的所有事件。 如果您的管線是由多個原始程式碼成品所組成,例如多個筆記本,則會將任何已定義的事件攔截套用至整個管線。 雖然事件攔截會包含在管線的原始程式碼中,但它們不會包含在管線圖形中。

您可以將事件攔截與發佈至 Hive 中繼存放區或 Unity 目錄的管線搭配使用。

注意

  • Python 是唯一支援定義事件攔截的語言。
  • 事件攔截只會針對maturity_levelSTABLE的事件觸發。
  • 事件攔截會從管線更新異步執行,但與其他事件攔截同步執行。 這表示一次只會執行單一事件攔截,而其他事件攔截會等候執行,直到目前正在執行的事件攔截完成為止。 如果事件攔截無限期執行,它會封鎖所有其他事件攔截。
  • Delta Live Tables 會嘗試在管線更新期間發出的每個事件上執行每個事件攔截。 為了協助確保延遲事件攔截有時間處理所有佇列事件,Delta Live Tables 會在終止執行管線的計算之前,等候不可設定的固定期間。 不過,不保證在計算終止之前,所有事件都會觸發所有攔截。

監視事件攔截處理

hook_progress使用 Delta Live Tables 事件記錄檔中的事件類型來監視更新事件攔截的狀態。 為了避免迴圈相依性,事件不會觸發 hook_progress 事件攔截。

定義事件攔截

若要定義事件攔截,請使用 on_event_hook 裝飾專案:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

max_allowable_consecutive_failures描述事件攔截在停用之前可能會失敗的連續次數上限。 事件攔截失敗定義為每當事件攔截擲回例外狀況時。 如果事件攔截已停用,則在重新啟動管線之前,它不會處理新的事件。

max_allowable_consecutive_failures 必須是大於或等於 0 或的 None整數。 值 None (預設指派) 表示事件攔截允許的連續失敗數目沒有限制,而且永遠不會停用事件攔截。

事件攔截失敗和停用事件攔截可以在事件記錄檔中監視為 hook_progress 事件。

事件攔截函式必須是 Python 函式,可只接受一個參數,這是觸發此事件攔截之事件的字典表示法。 會忽略事件攔截函式的任何傳回值。

範例:選取要處理的特定事件

下列範例示範事件攔截,可選取要處理的特定事件。 具體而言,此範例會等到收到管線 STOPPING 事件,然後將訊息輸出至驅動程序記錄 stdout

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

範例:將所有事件傳送至 Slack 通道

下列範例會實作事件攔截,以使用 Slack API 將接收的所有事件傳送至 Slack 通道。

此範例會使用 Databricks 秘密 安全地儲存向 Slack API 進行驗證所需的令牌。

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

範例:設定事件攔截以在連續四次失敗后停用

下列範例示範如何設定事件攔截,如果事件攔截連續四次失敗,就會停用。

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

範例:具有事件攔截的 Delta Live Tables 管線

下列範例示範如何將事件攔截新增至管線的原始程式碼。 這是搭配管線使用事件攔截的簡單但完整範例。

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })