Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
System zdarzeń przepływu pracy zapewnia obserwowalność wykonania przepływu pracy. Zdarzenia są emitowane w kluczowych punktach podczas wykonywania i mogą być odbierane w czasie rzeczywistym za pośrednictwem transmisji strumieniowej.
Wbudowane typy zdarzeń
// Workflow lifecycle events
WorkflowStartedEvent // Workflow execution begins
WorkflowOutputEvent // Workflow outputs data
WorkflowErrorEvent // Workflow encounters an error
WorkflowWarningEvent // Workflow encountered a warning
// Executor events
ExecutorInvokedEvent // Executor starts processing
ExecutorCompletedEvent // Executor finishes processing
ExecutorFailedEvent // Executor encounters an error
AgentResponseEvent // An agent run produces output
AgentResponseUpdateEvent // An agent run produces a streaming update
// Superstep events
SuperStepStartedEvent // Superstep begins
SuperStepCompletedEvent // Superstep completes
// Request events
RequestInfoEvent // A request is issued
# All events use the unified WorkflowEvent class with a type discriminator:
# Workflow lifecycle events
WorkflowEvent.type == "started" # Workflow execution begins
WorkflowEvent.type == "status" # Workflow state changed (use .state)
WorkflowEvent.type == "output" # Workflow produces an output
WorkflowEvent.type == "failed" # Workflow terminated with error (use .details)
WorkflowEvent.type == "error" # Non-fatal error from user code
WorkflowEvent.type == "warning" # Workflow encountered a warning
# Executor events
WorkflowEvent.type == "executor_invoked" # Executor starts processing
WorkflowEvent.type == "executor_completed" # Executor finishes processing
WorkflowEvent.type == "executor_failed" # Executor encounters an error
WorkflowEvent.type == "data" # Executor emitted data (e.g., AgentResponse)
# Superstep events
WorkflowEvent.type == "superstep_started" # Superstep begins
WorkflowEvent.type == "superstep_completed" # Superstep completes
# Request events
WorkflowEvent.type == "request_info" # A request is issued
Korzystanie ze zdarzeń
using Microsoft.Agents.AI.Workflows;
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorInvokedEvent invoke:
Console.WriteLine($"Starting {invoke.ExecutorId}");
break;
case ExecutorCompletedEvent complete:
Console.WriteLine($"Completed {complete.ExecutorId}: {complete.Data}");
break;
case WorkflowOutputEvent output:
Console.WriteLine($"Workflow output: {output.Data}");
return;
case WorkflowErrorEvent error:
Console.WriteLine($"Workflow error: {error.Exception}");
return;
}
}
from agent_framework import WorkflowEvent
async for event in workflow.run_stream(input_message):
if event.type == "executor_invoked":
print(f"Starting {event.executor_id}")
elif event.type == "executor_completed":
print(f"Completed {event.executor_id}: {event.data}")
elif event.type == "output":
print(f"Workflow produced output: {event.data}")
return
elif event.type == "error":
print(f"Workflow error: {event.data}")
return
Zdarzenia niestandardowe
Zdarzenia niestandardowe pozwalają wykonawcom emitować sygnały specyficzne dla domeny podczas wykonywania przepływu pracy dostosowane do potrzeb aplikacji. Oto przykładowe przypadki użycia:
- Śledzenie postępu — zgłaszanie kroków pośrednich, dzięki czemu osoby wywołujące mogą wyświetlać aktualizacje stanu.
- Emituj dane diagnostyczne — przekaż ostrzeżenia, metryki lub informacje debugowania bez zmieniania danych wyjściowych przepływu pracy.
- Przekazywanie danych domeny — przesyłanie struktur danych (np. zapisy bazy danych, wywołania narzędzi) do odbiorników w czasie rzeczywistym.
Definiowanie zdarzeń niestandardowych
Zdefiniuj zdarzenie niestandardowe według podklasy WorkflowEvent. Konstruktor bazowy akceptuje opcjonalny object? data ładunek udostępniany za pośrednictwem Data właściwości.
using Microsoft.Agents.AI.Workflows;
// Simple event with a string payload
internal sealed class ProgressEvent(string step) : WorkflowEvent(step) { }
// Event with a structured payload
internal sealed class MetricsEvent(MetricsData metrics) : WorkflowEvent(metrics) { }
W Pythonie utwórz zdarzenia niestandardowe, używając klasy WorkflowEvent bezpośrednio z ciągiem znaków dyskryminującym typ niestandardowy. Parametry type i data zawierają wszystkie informacje.
from agent_framework import WorkflowEvent
# Create a custom event with a custom type string and payload
event = WorkflowEvent(type="progress", data="Step 1 complete")
# Custom event with a structured payload
event = WorkflowEvent(type="metrics", data={"latency_ms": 42, "tokens": 128})
Uwaga / Notatka
Typy zdarzeń "started", "status"i "failed" są zarezerwowane dla powiadomień cyklu życia platformy. Jeśli egzekutor próbuje emitować jeden z tych typów, zdarzenie jest ignorowane, a ostrzeżenie jest rejestrowane.
Emitowanie zdarzeń niestandardowych
Emituj zdarzenia niestandardowe z programu obsługi komunikatów egzekutora, wywołując AddEventAsync w IWorkflowContext.
using Microsoft.Agents.AI.Workflows;
internal sealed class ProgressEvent(string step) : WorkflowEvent(step) { }
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
await context.AddEventAsync(new ProgressEvent("Validating input"));
// Executor logic...
await context.AddEventAsync(new ProgressEvent("Processing complete"));
}
}
Emituj niestandardowe zdarzenia za pomocą programu obsługi poprzez wywołanie add_event na WorkflowContext:
from agent_framework import (
handler,
Executor,
WorkflowContext,
WorkflowEvent,
)
class CustomExecutor(Executor):
@handler
async def handle(self, message: str, ctx: WorkflowContext[str]) -> None:
await ctx.add_event(WorkflowEvent(type="progress", data="Validating input"))
# Executor logic...
await ctx.add_event(WorkflowEvent(type="progress", data="Processing complete"))
Korzystanie ze zdarzeń niestandardowych
Skorzystaj z dopasowania wzorców, aby filtrować niestandardowy typ zdarzenia w strumieniu zdarzeń.
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
switch (evt)
{
case ProgressEvent progress:
Console.WriteLine($"Progress: {progress.Data}");
break;
case WorkflowOutputEvent output:
Console.WriteLine($"Done: {output.Data}");
return;
}
}
Filtruj ciąg rozróżniający typ niestandardowy:
async for event in workflow.run(input_message, stream=True):
if event.type == "progress":
print(f"Progress: {event.data}")
elif event.type == "output":
print(f"Done: {event.data}")
return
Dalsze kroki
Powiązane tematy: