Aracılığıyla paylaş


Olay kancalarıyla Delta Live Tables işlem hatlarının özel izlemesini tanımlama

Önemli

Olay kancaları desteği Genel Önizleme aşamasındadır.

Olaylar Delta Live Tables işlem hattının olay günlüğüne kalıcı hale getirildiğinde çalıştırılan özel Python geri çağırma işlevleri eklemek için olay kancalarını kullanabilirsiniz. Özel izleme ve uyarı çözümleri uygulamak için olay kancalarını kullanabilirsiniz. Örneğin, belirli olaylar gerçekleştiğinde e-posta göndermek veya günlüğe yazmak veya işlem hattı olaylarını izlemek için üçüncü taraf çözümlerle tümleştirmek için olay kancalarını kullanabilirsiniz.

Tek bir bağımsız değişkeni kabul eden bir Python işleviyle bir olay kancası tanımlarsınız; burada bağımsız değişken bir olayı temsil eden bir sözlüktür. Ardından olay kancalarını bir işlem hattının kaynak kodunun parçası olarak eklersiniz. İşlem hattında tanımlanan tüm olay kancaları, her işlem hattı güncelleştirmesi sırasında oluşturulan tüm olayları işlemeye çalışır. İşlem hattınız birden çok kaynak kodu yapıtından oluşuyorsa (örneğin, birden çok not defteri) tanımlı olay kancaları tüm işlem hattına uygulanır. Olay kancaları işlem hattınızın kaynak koduna dahil olsa da, işlem hattı grafiğine dahil değildir.

Olay kancalarını Hive meta veri deposunda veya Unity Kataloğu'nda yayımlayan işlem hatlarıyla kullanabilirsiniz.

Not

  • Python, olay kancalarını tanımlamak için desteklenen tek dildir.
  • Olay kancaları yalnızca maturity_level olduğu olaylar için tetiklenirSTABLE.
  • Olay kancaları işlem hattı güncelleştirmelerinden zaman uyumsuz olarak yürütülür, ancak diğer olay kancalarıyla zaman uyumlu olarak yürütülür. Bu, aynı anda yalnızca tek bir olay kancasının çalıştırıldığını ve diğer olay kancalarının çalışmak için o anda çalışan olay kancası tamamlanana kadar bekleyeceği anlamına gelir. Bir olay kancası süresiz olarak çalıştırılırsa, diğer tüm olay kancalarını engeller.
  • Delta Live Tables, işlem hattı güncelleştirmesi sırasında yayılan her olayda her olay kancasını çalıştırmayı dener. Gecikmeli olay kancalarının tüm kuyruğa alınan olayları işlemek için zamanı olduğundan emin olmak için Delta Live Tables, işlem hattını çalıştıran işlemi sonlandırmadan önce yapılandırılamayan sabit bir süre bekler. Ancak işlem sonlandırılmadan önce tüm olaylarda tüm kancaların tetiklendiğinden emin olunmaması gerekir.

Olay kancası işlemeyi izleme

Bir güncelleştirmenin hook_progress olay kancalarının durumunu izlemek için Delta Live Tables olay günlüğündeki olay türünü kullanın. Döngüsel bağımlılıkları önlemek için olay kancaları olaylar için hook_progress tetiklenmez.

Olay kancası tanımlama

Olay kancası tanımlamak için dekoratörü kullanın on_event_hook :

@dlt.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

, max_allowable_consecutive_failures bir olay kancasının devre dışı bırakılmadan önce art arda kaç kez başarısız olabileceğini açıklar. Olay kancası her zaman olay kancası özel durum oluşturur olarak tanımlanır. Olay kancası devre dışı bırakılırsa, işlem hattı yeniden başlatılana kadar yeni olayları işlemez.

max_allowable_consecutive_failures veya değerinden büyük veya eşit 0Nonebir tamsayı olmalıdır. değeri None (varsayılan olarak atanır), olay kancası için izin verilen ardışık hata sayısı sınırı olmadığı ve olay kancasının hiçbir zaman devre dışı bırakılmadığı anlamına gelir.

Olay kancası hataları ve olay kancalarının devre dışı bırakılması olay günlüğünde olaylar olarak hook_progress izlenebilir.

Olay kancası işlevi, bu olay kancasını tetikleyen olayın sözlük gösterimi olan tam olarak bir parametreyi kabul eden bir Python işlevi olmalıdır. Olay kancası işlevindeki herhangi bir dönüş değeri yoksayılır.

Örnek: İşlenmek üzere belirli olayları seçme

Aşağıdaki örnekte, işlenmek üzere belirli olayları seçen bir olay kancası gösterilmektedir. Özellikle, bu örnek işlem hattı STOPPING olayları alınana kadar bekler ve sürücü günlüklerine bir ileti döndürür stdout.

@on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Örnek: Tüm olayları Slack kanalına gönderme

Aşağıdaki örnek, Slack API'sini kullanarak alınan tüm olayları Slack kanalına gönderen bir olay kancası uygular.

Bu örnek, Slack API'sinde kimlik doğrulaması yapmak için gereken belirteci güvenli bir şekilde depolamak için Databricks gizli dizisini kullanır.

from dlt import on_event_hook
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Örnek: Ardışık dört hatadan sonra devre dışı bırakmak için bir olay kancası yapılandırma

Aşağıdaki örnek, ardışık olarak dört kez başarısız olursa devre dışı bırakılan bir olay kancasının nasıl yapılandırıldığını gösterir.

from dlt import on_event_hook
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Örnek: Olay kancası olan Delta Live Tables işlem hattı

Aşağıdaki örnekte, bir işlem hattının kaynak koduna olay kancası ekleme gösterilmektedir. Bu, işlem hattıyla olay kancalarını kullanmanın basit ama eksiksiz bir örneğidir.

from dlt import table, on_event_hook, read
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })