Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Система событий рабочего процесса обеспечивает наблюдаемость выполнения рабочего процесса. События выдаются в ключевых точках во время выполнения и могут использоваться в режиме реального времени через потоковое вещание.
Встроенные типы событий
// Workflow lifecycle events
WorkflowStartedEvent // Workflow execution begins
WorkflowOutputEvent // Workflow outputs data
WorkflowErrorEvent // Workflow encounters an error
WorkflowWarningEvent // Workflow encountered a warning
// Executor events
ExecutorInvokedEvent // Executor starts processing
ExecutorCompletedEvent // Executor finishes processing
ExecutorFailedEvent // Executor encounters an error
AgentResponseEvent // An agent run produces output
AgentResponseUpdateEvent // An agent run produces a streaming update
// Superstep events
SuperStepStartedEvent // Superstep begins
SuperStepCompletedEvent // Superstep completes
// Request events
RequestInfoEvent // A request is issued
# All events use the unified WorkflowEvent class with a type discriminator:
# Workflow lifecycle events
WorkflowEvent.type == "started" # Workflow execution begins
WorkflowEvent.type == "status" # Workflow state changed (use .state)
WorkflowEvent.type == "output" # Workflow produces an output
WorkflowEvent.type == "failed" # Workflow terminated with error (use .details)
WorkflowEvent.type == "error" # Non-fatal error from user code
WorkflowEvent.type == "warning" # Workflow encountered a warning
# Executor events
WorkflowEvent.type == "executor_invoked" # Executor starts processing
WorkflowEvent.type == "executor_completed" # Executor finishes processing
WorkflowEvent.type == "executor_failed" # Executor encounters an error
WorkflowEvent.type == "data" # Executor emitted data (e.g., AgentResponse)
# Superstep events
WorkflowEvent.type == "superstep_started" # Superstep begins
WorkflowEvent.type == "superstep_completed" # Superstep completes
# Request events
WorkflowEvent.type == "request_info" # A request is issued
Использование событий
using Microsoft.Agents.AI.Workflows;
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorInvokedEvent invoke:
Console.WriteLine($"Starting {invoke.ExecutorId}");
break;
case ExecutorCompletedEvent complete:
Console.WriteLine($"Completed {complete.ExecutorId}: {complete.Data}");
break;
case WorkflowOutputEvent output:
Console.WriteLine($"Workflow output: {output.Data}");
return;
case WorkflowErrorEvent error:
Console.WriteLine($"Workflow error: {error.Exception}");
return;
}
}
from agent_framework import WorkflowEvent
async for event in workflow.run_stream(input_message):
if event.type == "executor_invoked":
print(f"Starting {event.executor_id}")
elif event.type == "executor_completed":
print(f"Completed {event.executor_id}: {event.data}")
elif event.type == "output":
print(f"Workflow produced output: {event.data}")
return
elif event.type == "error":
print(f"Workflow error: {event.data}")
return
Пользовательские события
Пользовательские события позволяют исполнителям выдавать сигналы, относящиеся к домену, во время выполнения рабочего процесса, адаптированные к потребностям приложения. Ниже приведены некоторые примеры вариантов использования:
- Отслеживание хода выполнения — отображайте промежуточные шаги, чтобы звонящие могли показывать обновления состояния.
- Вывод диагностики — выводит предупреждения, метрики или отладочные сведения, не изменяя выходные данные рабочего процесса.
- Данные домена ретранслятора — отправка структурированных полезных данных (например, записи баз данных, вызовы инструментов) прослушивателям в режиме реального времени.
Определение пользовательских событий
Определите настраиваемое событие созданием подкласса WorkflowEvent. Базовый конструктор принимает необязательную нагрузку object? data, которая становится доступной через свойство Data.
using Microsoft.Agents.AI.Workflows;
// Simple event with a string payload
internal sealed class ProgressEvent(string step) : WorkflowEvent(step) { }
// Event with a structured payload
internal sealed class MetricsEvent(MetricsData metrics) : WorkflowEvent(metrics) { }
В Python, создайте настраиваемые события, используя класс WorkflowEvent непосредственно с строкой-дискриминатором пользовательского типа.
type Параметры data содержат все сведения.
from agent_framework import WorkflowEvent
# Create a custom event with a custom type string and payload
event = WorkflowEvent(type="progress", data="Step 1 complete")
# Custom event with a structured payload
event = WorkflowEvent(type="metrics", data={"latency_ms": 42, "tokens": 128})
Замечание
Типы событий "started", "status" и "failed" зарезервированы для уведомлений жизненного цикла фреймворка. Если исполнитель пытается выпустить один из этих типов, событие игнорируется и регистрируется предупреждение.
Генерация пользовательских событий
Вызовите кастомные события из обработчика сообщений исполнителя с помощью функции AddEventAsync на IWorkflowContext.
using Microsoft.Agents.AI.Workflows;
internal sealed class ProgressEvent(string step) : WorkflowEvent(step) { }
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
await context.AddEventAsync(new ProgressEvent("Validating input"));
// Executor logic...
await context.AddEventAsync(new ProgressEvent("Processing complete"));
}
}
Передайте пользовательские события из обработчика событий, вызвав add_event на WorkflowContext.
from agent_framework import (
handler,
Executor,
WorkflowContext,
WorkflowEvent,
)
class CustomExecutor(Executor):
@handler
async def handle(self, message: str, ctx: WorkflowContext[str]) -> None:
await ctx.add_event(WorkflowEvent(type="progress", data="Validating input"))
# Executor logic...
await ctx.add_event(WorkflowEvent(type="progress", data="Processing complete"))
Использование пользовательских событий
Используйте сопоставление шаблонов для фильтрации пользовательского типа события в потоке событий:
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ProgressEvent progress:
Console.WriteLine($"Progress: {progress.Data}");
break;
case WorkflowOutputEvent output:
Console.WriteLine($"Done: {output.Data}");
return;
}
}
Фильтрация по строке дискриминатора пользовательского типа:
async for event in workflow.run(input_message, stream=True):
if event.type == "progress":
print(f"Progress: {event.data}")
elif event.type == "output":
print(f"Done: {event.data}")
return
Дальнейшие шаги
См. также: