Поделиться через


Events

Система событий рабочего процесса обеспечивает наблюдаемость выполнения рабочего процесса. События выдаются в ключевых точках во время выполнения и могут использоваться в режиме реального времени через потоковое вещание.

Встроенные типы событий

// Workflow lifecycle events
WorkflowStartedEvent     // Workflow execution begins
WorkflowOutputEvent      // Workflow outputs data
WorkflowErrorEvent       // Workflow encounters an error
WorkflowWarningEvent     // Workflow encountered a warning

// Executor events
ExecutorInvokedEvent     // Executor starts processing
ExecutorCompletedEvent   // Executor finishes processing
ExecutorFailedEvent      // Executor encounters an error
AgentResponseEvent       // An agent run produces output
AgentResponseUpdateEvent // An agent run produces a streaming update

// Superstep events
SuperStepStartedEvent    // Superstep begins
SuperStepCompletedEvent  // Superstep completes

// Request events
RequestInfoEvent         // A request is issued
# All events use the unified WorkflowEvent class with a type discriminator:

# Workflow lifecycle events
WorkflowEvent.type == "started"             # Workflow execution begins
WorkflowEvent.type == "status"              # Workflow state changed (use .state)
WorkflowEvent.type == "output"              # Workflow produces an output
WorkflowEvent.type == "failed"              # Workflow terminated with error (use .details)
WorkflowEvent.type == "error"               # Non-fatal error from user code
WorkflowEvent.type == "warning"             # Workflow encountered a warning

# Executor events
WorkflowEvent.type == "executor_invoked"    # Executor starts processing
WorkflowEvent.type == "executor_completed"  # Executor finishes processing
WorkflowEvent.type == "executor_failed"     # Executor encounters an error
WorkflowEvent.type == "data"                # Executor emitted data (e.g., AgentResponse)

# Superstep events
WorkflowEvent.type == "superstep_started"   # Superstep begins
WorkflowEvent.type == "superstep_completed" # Superstep completes

# Request events
WorkflowEvent.type == "request_info"        # A request is issued

Использование событий

using Microsoft.Agents.AI.Workflows;

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case ExecutorInvokedEvent invoke:
            Console.WriteLine($"Starting {invoke.ExecutorId}");
            break;

        case ExecutorCompletedEvent complete:
            Console.WriteLine($"Completed {complete.ExecutorId}: {complete.Data}");
            break;

        case WorkflowOutputEvent output:
            Console.WriteLine($"Workflow output: {output.Data}");
            return;

        case WorkflowErrorEvent error:
            Console.WriteLine($"Workflow error: {error.Exception}");
            return;
    }
}
from agent_framework import WorkflowEvent

async for event in workflow.run_stream(input_message):
    if event.type == "executor_invoked":
        print(f"Starting {event.executor_id}")
    elif event.type == "executor_completed":
        print(f"Completed {event.executor_id}: {event.data}")
    elif event.type == "output":
        print(f"Workflow produced output: {event.data}")
        return
    elif event.type == "error":
        print(f"Workflow error: {event.data}")
        return

Пользовательские события

Пользовательские события позволяют исполнителям выдавать сигналы, относящиеся к домену, во время выполнения рабочего процесса, адаптированные к потребностям приложения. Ниже приведены некоторые примеры вариантов использования:

  • Отслеживание хода выполнения — отображайте промежуточные шаги, чтобы звонящие могли показывать обновления состояния.
  • Вывод диагностики — выводит предупреждения, метрики или отладочные сведения, не изменяя выходные данные рабочего процесса.
  • Данные домена ретранслятора — отправка структурированных полезных данных (например, записи баз данных, вызовы инструментов) прослушивателям в режиме реального времени.

Определение пользовательских событий

Определите настраиваемое событие созданием подкласса WorkflowEvent. Базовый конструктор принимает необязательную нагрузку object? data, которая становится доступной через свойство Data.

using Microsoft.Agents.AI.Workflows;

// Simple event with a string payload
internal sealed class ProgressEvent(string step) : WorkflowEvent(step) { }

// Event with a structured payload
internal sealed class MetricsEvent(MetricsData metrics) : WorkflowEvent(metrics) { }

В Python, создайте настраиваемые события, используя класс WorkflowEvent непосредственно с строкой-дискриминатором пользовательского типа. type Параметры data содержат все сведения.

from agent_framework import WorkflowEvent

# Create a custom event with a custom type string and payload
event = WorkflowEvent(type="progress", data="Step 1 complete")

# Custom event with a structured payload
event = WorkflowEvent(type="metrics", data={"latency_ms": 42, "tokens": 128})

Замечание

Типы событий "started", "status" и "failed" зарезервированы для уведомлений жизненного цикла фреймворка. Если исполнитель пытается выпустить один из этих типов, событие игнорируется и регистрируется предупреждение.

Генерация пользовательских событий

Вызовите кастомные события из обработчика сообщений исполнителя с помощью функции AddEventAsync на IWorkflowContext.

using Microsoft.Agents.AI.Workflows;

internal sealed class ProgressEvent(string step) : WorkflowEvent(step) { }

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        await context.AddEventAsync(new ProgressEvent("Validating input"));

        // Executor logic...

        await context.AddEventAsync(new ProgressEvent("Processing complete"));
    }
}

Передайте пользовательские события из обработчика событий, вызвав add_event на WorkflowContext.

from agent_framework import (
    handler,
    Executor,
    WorkflowContext,
    WorkflowEvent,
)

class CustomExecutor(Executor):

    @handler
    async def handle(self, message: str, ctx: WorkflowContext[str]) -> None:
        await ctx.add_event(WorkflowEvent(type="progress", data="Validating input"))

        # Executor logic...

        await ctx.add_event(WorkflowEvent(type="progress", data="Processing complete"))

Использование пользовательских событий

Используйте сопоставление шаблонов для фильтрации пользовательского типа события в потоке событий:

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    switch (evt)
    {
        case ProgressEvent progress:
            Console.WriteLine($"Progress: {progress.Data}");
            break;

        case WorkflowOutputEvent output:
            Console.WriteLine($"Done: {output.Data}");
            return;
    }
}

Фильтрация по строке дискриминатора пользовательского типа:

async for event in workflow.run(input_message, stream=True):
    if event.type == "progress":
        print(f"Progress: {event.data}")
    elif event.type == "output":
        print(f"Done: {event.data}")
        return

Дальнейшие шаги

См. также: