Bagikan melalui


Menentukan pemantauan kustom alur Tabel Langsung Delta dengan hook peristiwa

Penting

Dukungan untuk hook peristiwa ada di Pratinjau Umum.

Anda dapat menggunakan hook peristiwa untuk menambahkan fungsi panggilan balik Python kustom yang berjalan saat peristiwa dipertahankan ke log peristiwa alur Delta Live Tables. Anda dapat menggunakan kait peristiwa untuk mengimplementasikan solusi pemantauan dan pemberitahuan kustom. Misalnya, Anda dapat menggunakan hook peristiwa untuk mengirim email atau menulis ke log saat peristiwa tertentu terjadi atau untuk berintegrasi dengan solusi pihak ketiga untuk memantau peristiwa alur.

Anda menentukan hook peristiwa dengan fungsi Python yang menerima satu argumen, di mana argumen adalah kamus yang mewakili peristiwa. Anda kemudian menyertakan hook peristiwa sebagai bagian dari kode sumber untuk alur. Setiap kait peristiwa yang ditentukan dalam alur akan mencoba memproses semua peristiwa yang dihasilkan selama setiap pembaruan alur. Jika alur Anda terdiri dari beberapa artefak kode sumber, misalnya, beberapa notebook, setiap hook peristiwa yang ditentukan diterapkan ke seluruh alur. Meskipun kait peristiwa disertakan dalam kode sumber untuk alur Anda, kait tersebut tidak disertakan dalam grafik alur.

Anda dapat menggunakan hook peristiwa dengan alur yang diterbitkan ke metastore Apache Hive atau Katalog Unity.

Catatan

  • Python adalah satu-satunya bahasa yang didukung untuk menentukan kait peristiwa.
  • Kait peristiwa dipicu hanya untuk peristiwa di mana maturity_level adalah STABLE.
  • Kait peristiwa dijalankan secara asinkron dari pembaruan alur tetapi secara sinkron dengan kait peristiwa lainnya. Ini berarti bahwa hanya satu kait peristiwa yang berjalan pada satu waktu, dan hook peristiwa lainnya menunggu untuk berjalan hingga hook peristiwa yang sedang berjalan selesai. Jika hook peristiwa berjalan tanpa batas waktu, itu memblokir semua kait peristiwa lainnya.
  • Tabel Langsung Delta mencoba menjalankan setiap hook peristiwa pada setiap peristiwa yang dikeluarkan selama pembaruan alur. Untuk membantu memastikan bahwa kait peristiwa yang tertinggal memiliki waktu untuk memproses semua peristiwa yang diantrekan, Delta Live Tables menunggu periode tetap yang tidak dapat dikonfigurasi sebelum mengakhiri komputasi yang menjalankan alur. Namun, tidak dijamin bahwa semua kait dipicu pada semua peristiwa sebelum komputasi dihentikan.

Memantau pemrosesan kait peristiwa

hook_progress Gunakan jenis peristiwa di log peristiwa Tabel Langsung Delta untuk memantau status hook peristiwa pembaruan. Untuk mencegah dependensi melingkar, kait peristiwa tidak dipicu untuk hook_progress peristiwa.

Menentukan hook peristiwa

Untuk menentukan hook peristiwa, gunakan on_event_hook dekorator:

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

menjelaskan max_allowable_consecutive_failures jumlah maksimum kali berturut-turut kait peristiwa dapat gagal sebelum dinonaktifkan. Kegagalan kait peristiwa didefinisikan sebagai kapan saja kait peristiwa melemparkan pengecualian. Jika hook peristiwa dinonaktifkan, itu tidak memproses peristiwa baru sampai alur dimulai ulang.

max_allowable_consecutive_failures harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0 atau None. Nilai None (ditetapkan secara default) berarti tidak ada batasan jumlah kegagalan berturut-turut yang diizinkan untuk hook peristiwa, dan hook peristiwa tidak pernah dinonaktifkan.

Kegagalan kait peristiwa dan pennonaktifkan kait peristiwa dapat dipantau di log peristiwa sebagai hook_progress peristiwa.

Fungsi hook peristiwa harus berupa fungsi Python yang menerima tepat satu parameter, representasi kamus peristiwa yang memicu kait peristiwa ini. Nilai pengembalian apa pun dari fungsi kait peristiwa diabaikan.

Contoh: Pilih peristiwa tertentu untuk diproses

Contoh berikut menunjukkan kait peristiwa yang memilih peristiwa tertentu untuk diproses. Secara khusus, contoh ini menunggu hingga peristiwa alur STOPPING diterima dan kemudian menghasilkan pesan ke log stdoutdriver .

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Contoh: Mengirim semua peristiwa ke saluran Slack

Contoh berikut mengimplementasikan kait peristiwa yang mengirim semua peristiwa yang diterima ke saluran Slack menggunakan API Slack.

Contoh ini menggunakan rahasia Databricks untuk menyimpan token yang diperlukan untuk mengautentikasi ke API Slack dengan aman.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Contoh: Mengonfigurasi hook peristiwa untuk dinonaktifkan setelah empat kegagalan berturut-turut

Contoh berikut menunjukkan cara mengonfigurasi kait peristiwa yang dinonaktifkan jika gagal secara berturut-turut empat kali.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Contoh: Alur Tabel Langsung Delta dengan hook peristiwa

Contoh berikut menunjukkan penambahan kait peristiwa ke kode sumber untuk alur. Ini adalah contoh sederhana tetapi lengkap menggunakan kait peristiwa dengan alur.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })