Partager via


Étape 5 : Flux de travail

Les flux de travail vous permettent de chaîner plusieurs étapes ensemble : chaque étape traite les données et les transmet à la suivante.

Définir les étapes du flux de travail (exécuteurs) :

using Microsoft.Agents.AI.Workflows;

// Step 1: Convert text to uppercase
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");

// Step 2: Reverse the string and yield output
class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
    public override ValueTask<string> HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        return ValueTask.FromResult(string.Concat(message.Reverse()));
    }
}
ReverseTextExecutor reverse = new();

Générez et exécutez le flux de travail :

WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();

await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
    if (evt is ExecutorCompletedEvent executorComplete)
    {
        Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
    }
}

Conseil / Astuce

Consultez cet article pour obtenir un exemple d’application exécutable complet.

Définissez les étapes de flux de travail (exécuteurs) et connectez-les à des arêtes :

# Step 1: A class-based executor that converts text to uppercase
class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert input to uppercase and forward to the next node."""
        await ctx.send_message(text.upper())


# Step 2: A function-based executor that reverses the string and yields output
@executor(id="reverse_text")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the string and yield the final workflow output."""
    await ctx.yield_output(text[::-1])


def create_workflow():
    """Build the workflow: UpperCase → reverse_text."""
    upper = UpperCase(id="upper_case")
    return WorkflowBuilder(start_executor=upper).add_edge(upper, reverse_text).build()

Générez et exécutez le flux de travail :

workflow = create_workflow()

events = await workflow.run("hello world")
print(f"Output: {events.get_outputs()}")
print(f"Final state: {events.get_final_state()}")

Conseil / Astuce

Consultez l’exemple complet du fichier exécutable complet.

Prochaines étapes

Aller plus loin :