Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Anda dapat menggunakan hook peristiwa untuk menambahkan fungsi callback Python kustom yang dijalankan ketika peristiwa disimpan ke dalam log peristiwa dalam suatu alur. Anda dapat menggunakan kait peristiwa untuk mengimplementasikan solusi pemantauan dan pemberitahuan kustom. Misalnya, Anda dapat menggunakan hook peristiwa untuk mengirim email atau menulis ke log saat peristiwa tertentu terjadi atau untuk berintegrasi dengan solusi pihak ketiga untuk memantau peristiwa alur.
Tentukan hook peristiwa dengan fungsi Python yang menerima satu argumen, di mana argumen adalah kamus yang mewakili peristiwa. Kemudian sertakan event hooks ke dalam kode sumber dari sebuah pipeline. Setiap kait peristiwa yang ditentukan dalam alur akan mencoba memproses semua peristiwa yang dihasilkan selama setiap pembaruan alur. Jika alur Anda terdiri dari beberapa file kode sumber, setiap hook peristiwa yang ditentukan diterapkan ke seluruh alur. Meskipun kait peristiwa disertakan dalam kode sumber untuk alur Anda, kait tersebut tidak disertakan dalam grafik alur.
Anda dapat menggunakan pengait acara dengan alur kerja yang diterbitkan ke Metastore Apache Hive atau Katalog Unity.
Nota
- Python adalah satu-satunya bahasa yang didukung untuk menentukan kait peristiwa. Untuk menentukan fungsi Python kustom yang memproses peristiwa dalam alur yang diimplementasikan menggunakan antarmuka SQL, tambahkan fungsi kustom dalam file sumber Python terpisah yang berjalan sebagai bagian dari alur. Fungsi Python diterapkan ke seluruh alur saat alur berjalan.
- Pengait peristiwa dipicu hanya untuk peristiwa ketika tingkat kematangan adalah
STABLE. - Kait peristiwa dijalankan secara asinkron dengan pembaruan proses tetapi secara sinkron dengan kait peristiwa lainnya. Ini berarti bahwa hanya satu hook acara yang berjalan pada satu waktu, dan hook acara lainnya menunggu dieksekusi hingga hook acara tersebut selesai. Jika sebuah event hook berjalan tak terbatas, itu memblokir semua event hook lainnya.
- Lakeflow Spark Declarative Pipelines (SDP) berusaha menjalankan setiap pengait peristiwa pada setiap peristiwa yang dikeluarkan selama pembaruan alur. Untuk membantu memastikan bahwa kait peristiwa yang tertunda memiliki waktu untuk memproses semua peristiwa yang diantrekan, SDP menunggu periode tetap yang tidak dapat dikonfigurasi sebelum mengakhiri komputasi yang menjalankan alur pemrosesan. Namun, tidak dijamin bahwa semua kait dipicu pada semua peristiwa sebelum komputasi dihentikan.
Memantau pemrosesan kait peristiwa
Gunakan jenis peristiwa hook_progress di log peristiwa pipeline untuk memantau status hook peristiwa pembaruan. Untuk mencegah dependensi melingkar, kait peristiwa tidak dipicu untuk peristiwa hook_progress.
Tentukan peristiwa hook
Untuk menentukan event hook, gunakan dekorator on_event_hook:
@dp.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
# Python code defining the event hook
max_allowable_consecutive_failures menggambarkan jumlah maksimum kali berturut-turut sebuah kait peristiwa dapat gagal sebelum dinonaktifkan. Kegagalan kait peristiwa didefinisikan sebagai kapan saja kait peristiwa melemparkan pengecualian. Jika hook peristiwa dinonaktifkan, itu tidak memproses peristiwa baru sampai alur dimulai ulang.
max_allowable_consecutive_failures harus berupa bilangan bulat yang lebih besar dari atau sama dengan 0 atau None. Nilai None (ditetapkan secara default) berarti jumlah kegagalan berturut-turut yang diizinkan untuk hook peristiwa tidak terbatas, dan hook peristiwa tidak pernah dinonaktifkan.
Kegagalan kait peristiwa dan penonaktifan kait peristiwa dapat dipantau dalam log peristiwa sebagai peristiwa hook_progress.
Fungsi hook peristiwa harus berupa fungsi Python yang menerima tepat satu parameter, yaitu representasi dalam bentuk kamus dari peristiwa yang memicu fungsi hook peristiwa ini. Nilai pengembalian apa pun dari fungsi kait peristiwa diabaikan.
Contoh: Pilih peristiwa tertentu untuk diproses
Contoh berikut menunjukkan kait peristiwa yang memilih peristiwa tertentu untuk diproses. Khususnya, contoh ini menunggu hingga peristiwa alur STOPPING diterima dan kemudian mengeluarkan pesan ke log pengemudi stdout.
@dp.on_event_hook
def my_event_hook(event):
if (
event['event_type'] == 'update_progress' and
event['details']['update_progress']['state'] == 'STOPPING'
):
print('Received notification that update is stopping: ', event)
Contoh: Mengirim semua peristiwa ke saluran Slack
Contoh berikut mengimplementasikan kait peristiwa yang mengirim semua peristiwa yang diterima ke saluran Slack menggunakan API Slack.
Contoh ini menggunakan databricks rahasia untuk menyimpan token yang diperlukan untuk mengautentikasi ke API Slack dengan aman.
from pyspark import pipelines as dp
import requests
# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
@dp.on_event_hook
def write_events_to_slack(event):
res = requests.post(
url='https://slack.com/api/chat.postMessage',
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN,
},
json={
'channel': '<channel-id>',
'text': 'Received event:\n' + event,
}
)
Contoh: Mengonfigurasi hook peristiwa untuk dinonaktifkan setelah empat kegagalan berturut-turut
Contoh berikut menunjukkan cara mengonfigurasi kait peristiwa yang dinonaktifkan jika gagal secara berturut-turut empat kali.
from pyspark import pipelines as dp
import random
def run_failing_operation():
raise Exception('Operation has failed')
# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@dp.on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
run_failing_operation()
Contoh: Jalur dengan pengait peristiwa
Contoh berikut menunjukkan penambahan kait peristiwa ke kode sumber untuk alur. Ini adalah contoh sederhana tetapi lengkap menggunakan kait peristiwa dengan alur.
from pyspark import pipelines as dp
import requests
import json
import time
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + API_TOKEN
}
# Create a single dataset.
@dp.table
def test_dataset():
return spark.range(5)
# Definition of event hook to send events to a Slack channel.
@dp.on_event_hook
def write_events_to_slack(event):
res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
'channel': DEV_CHANNEL,
'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
})