Notă
Accesul la această pagină necesită autorizare. Puteți încerca să vă conectați sau să modificați directoarele.
Accesul la această pagină necesită autorizare. Puteți încerca să modificați directoarele.
Workflows let you chain multiple steps together — each step processes data and passes it to the next.
Define workflow steps (executors):
using Microsoft.Agents.AI.Workflows;
// Step 1: Convert text to uppercase
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");
// Step 2: Reverse the string and yield output
class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
return ValueTask.FromResult(string.Concat(message.Reverse()));
}
}
ReverseTextExecutor reverse = new();
Build and run the workflow:
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
if (evt is ExecutorCompletedEvent executorComplete)
{
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
}
}
Tip
See here for a full runnable sample application.
Define workflow steps (executors) and connect them with edges:
# Step 1: A class-based executor that converts text to uppercase
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert input to uppercase and forward to the next node."""
await ctx.send_message(text.upper())
# Step 2: A function-based executor that reverses the string and yields output
@executor(id="reverse_text")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the string and yield the final workflow output."""
await ctx.yield_output(text[::-1])
def create_workflow():
"""Build the workflow: UpperCase → reverse_text."""
upper = UpperCase(id="upper_case")
return WorkflowBuilder(start_executor=upper).add_edge(upper, reverse_text).build()
Build and run the workflow:
workflow = create_workflow()
events = await workflow.run("hello world")
print(f"Output: {events.get_outputs()}")
print(f"Final state: {events.get_final_state()}")
Tip
See the full sample for the complete runnable file.
Next steps
Go deeper:
- Workflows overview — understand workflow architecture
- Sequential workflows — linear step-by-step patterns
- Agents in workflows — using agents as workflow steps