Partager via


Orchestration des workflows du framework Microsoft Agent - Séquentiel

Dans l’orchestration séquentielle, les agents sont organisés dans un pipeline. Chaque agent traite la tâche à son tour, en passant sa sortie à l’agent suivant dans la séquence. Cela est idéal pour les flux de travail où chaque étape s’appuie sur l’étape précédente, comme la révision de document, les pipelines de traitement des données ou le raisonnement à plusieurs étapes.

Orchestration séquentielle

Important

L’historique complet des conversations des agents précédents est passé à l’agent suivant dans la séquence. Chaque agent peut voir tous les messages antérieurs, ce qui permet un traitement prenant en charge le contexte.

Ce que vous allez apprendre

  • Comment créer un pipeline séquentiel d’agents
  • Guide pratique pour chaîner des agents dans lesquels chacun s’appuie sur la sortie précédente
  • Comment combiner des agents avec des exécuteurs personnalisés pour des tâches spécialisées
  • Comment suivre le flux de conversation via le pipeline

Définir vos agents

Dans l’orchestration séquentielle, les agents sont organisés dans un pipeline où chaque agent traite la tâche à son tour, en passant la sortie à l’agent suivant dans la séquence.

Configurer le client Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

Avertissement

DefaultAzureCredential est pratique pour le développement, mais nécessite une considération minutieuse en production. En production, envisagez d’utiliser des informations d’identification spécifiques (par exemple ManagedIdentityCredential) pour éviter les problèmes de latence, la détection involontaire des informations d’identification et les risques de sécurité potentiels liés aux mécanismes de secours.

Créez des agents spécialisés qui fonctionneront en séquence :

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Configurer l’orchestration séquentielle

Créez le flux de travail en utilisant AgentWorkflowBuilder :

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Exécuter le flux de travail séquentiel

Exécutez le flux de travail et traitez les événements :

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Content}");
}

Exemple de sortie

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Concepts clés

  • Traitement séquentiel : chaque agent traite la sortie de l’agent précédent dans l’ordre
  • AgentWorkflowBuilder.BuildSequential() : crée un flux de travail de pipeline à partir d’une collection d’agents
  • ChatClientAgent : représente un agent soutenu par un client de conversation avec des instructions spécifiques
  • InProcessExecution.RunStreamingAsync() : exécute le flux de travail et retourne une StreamingRun diffusion en continu d’événements en temps réel
  • Gestion des événements : surveiller la progression de l’agent via AgentResponseUpdateEvent et l’achèvement via WorkflowOutputEvent

Dans l’orchestration séquentielle, chaque agent traite la tâche à son tour, avec un résultat qui se transfère d’un à l’autre. Commencez par définir des agents pour un processus en deux étapes :

import os
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

# 1) Create agents using AzureOpenAIResponsesClient
chat_client = AzureOpenAIResponsesClient(
    project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
    deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Configurer l’orchestration séquentielle

La SequentialBuilder classe crée un pipeline dans lequel les agents traitent les tâches dans l’ordre. Chaque agent voit l’historique complet des conversations et ajoute sa réponse :

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Exécuter le flux de travail séquentiel

Exécutez le flux de travail et collectez la conversation finale montrant la contribution de chaque agent :

from typing import Any, cast
from agent_framework import Message, WorkflowEvent

# 3) Run and print final conversation
outputs: list[list[Message]] = []
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output":
        outputs.append(cast(list[Message], event.data))

if outputs:
    print("===== Final Conversation =====")
    messages: list[Message] = outputs[-1]
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

Exemple de sortie

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Avancé : Combinaison d’agents avec des exécuteurs personnalisés

L’orchestration séquentielle prend en charge la combinaison d’agents avec des exécuteurs personnalisés pour un traitement spécifique. Cela est utile lorsque vous avez besoin d’une logique personnalisée qui ne nécessite pas de LLM :

Définir un exécuteur personnalisé

Note

Lorsqu’un exécuteur personnalisé suit un agent dans la séquence, son gestionnaire reçoit un AgentExecutorResponse (car les agents sont encapsulés en interne par AgentExecutor). Permet agent_response.full_conversation d’accéder à l’historique complet des conversations.

from agent_framework import AgentExecutorResponse, Executor, WorkflowContext, handler
from agent_framework import Message

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[list[Message]]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.send_message(list(agent_response.full_conversation) + [summary])

Créer un flux de travail séquentiel mixte

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Exemple de sortie avec exécuteur personnalisé

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

Concepts clés

  • Contexte partagé : chaque participant reçoit l’historique complet des conversations, y compris tous les messages précédents
  • Ordre important : les agents s’exécutent strictement dans l’ordre spécifié dans la participants liste
  • Participants flexibles : vous pouvez combiner des agents et des exécuteurs personnalisés dans n’importe quel ordre
  • Flux de conversation : chaque agent/exécuteur ajoute à la conversation, en créant un dialogue complet

Prochaines étapes