Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Dans l’orchestration séquentielle, les agents sont organisés dans un pipeline. Chaque agent traite la tâche à son tour, en passant sa sortie à l’agent suivant dans la séquence. Cela est idéal pour les flux de travail où chaque étape s’appuie sur l’étape précédente, comme la révision de document, les pipelines de traitement des données ou le raisonnement à plusieurs étapes.
Important
L’historique complet des conversations des agents précédents est passé à l’agent suivant dans la séquence. Chaque agent peut voir tous les messages antérieurs, ce qui permet un traitement prenant en charge le contexte.
Ce que vous allez apprendre
- Comment créer un pipeline séquentiel d’agents
- Guide pratique pour chaîner des agents dans lesquels chacun s’appuie sur la sortie précédente
- Comment ajouter une approbation par un humain pour les appels d'outils critiques.
- Comment combiner des agents avec des exécuteurs personnalisés pour des tâches spécialisées
- Comment suivre le flux de conversation via le pipeline
Définir vos agents
Dans l’orchestration séquentielle, les agents sont organisés dans un pipeline où chaque agent traite la tâche à son tour, en passant la sortie à l’agent suivant dans la séquence.
Configurer le client Azure OpenAI
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Avertissement
DefaultAzureCredential est pratique pour le développement, mais nécessite une considération minutieuse en production. En production, envisagez d’utiliser des informations d’identification spécifiques (par exemple ManagedIdentityCredential) pour éviter les problèmes de latence, la détection involontaire des informations d’identification et les risques de sécurité potentiels liés aux mécanismes de secours.
Créez des agents spécialisés qui fonctionneront en séquence :
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Configurer l’orchestration séquentielle
Créez le flux de travail en utilisant AgentWorkflowBuilder :
// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);
Exécuter le flux de travail séquentiel
Exécutez le flux de travail et traitez les événements :
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
if (e.ExecutorId != lastExecutorId)
{
lastExecutorId = e.ExecutorId;
Console.WriteLine();
Console.Write($"{e.ExecutorId}: ");
}
Console.Write(e.Update.Text);
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display final result
Console.WriteLine();
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Exemple de sortie
French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!
Orchestration séquentielle avec Human-in-the-Loop
Les orchestrations séquentielles prennent en charge les interactions avec intervention humaine par l'approbation des outils. Lorsque les agents utilisent des outils encapsulés avec ApprovalRequiredAIFunction, le flux de travail se suspend et émet un RequestInfoEvent contenant un ToolApprovalRequestContent. Les systèmes externes (tels qu’un opérateur humain) peuvent inspecter l’appel de l’outil, l’approuver ou le rejeter, et le flux de travail reprend en conséquence.
Conseil / Astuce
Pour plus d’informations sur le modèle de demande et de réponse, consultez Human-in-the-Loop.
Définir des agents utilisant des outils qui nécessitent une approbation
Créez des agents dans lesquels les outils sensibles sont encapsulés avec ApprovalRequiredAIFunction:
ChatClientAgent deployAgent = new(
client,
"You are a DevOps engineer. Check staging status first, then deploy to production.",
"DeployAgent",
"Handles deployments",
[
AIFunctionFactory.Create(CheckStagingStatus),
new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
]);
ChatClientAgent verifyAgent = new(
client,
"You are a QA engineer. Verify that the deployment was successful and summarize the results.",
"VerifyAgent",
"Verifies deployments");
Générer et exécuter avec la gestion des approbations
Générez normalement le flux de travail séquentiel. Le flux d’approbation est géré via le flux d’événements :
var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is RequestInfoEvent e &&
e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
{
await run.SendResponseAsync(
e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
}
}
Note
AgentWorkflowBuilder.BuildSequential() prend en charge l’approbation de l’outil d’emblée : aucune configuration supplémentaire n’est nécessaire. Lorsqu’un agent appelle un outil encapsulé avec ApprovalRequiredAIFunction, le flux de travail suspend et émet automatiquement un RequestInfoEvent.
Conseil / Astuce
Pour obtenir un exemple d’exécution complet de ce flux d’approbation, consultez l’exempleGroupChatToolApproval. Le même RequestInfoEvent modèle de gestion s’applique à d’autres orchestrations.
Concepts clés
- Traitement séquentiel : chaque agent traite la sortie de l’agent précédent dans l’ordre
- AgentWorkflowBuilder.BuildSequential() : crée un flux de travail de pipeline à partir d’une collection d’agents
- ChatClientAgent : représente un agent soutenu par un client de conversation avec des instructions spécifiques
-
InProcessExecution.RunStreamingAsync() : exécute le flux de travail et retourne une
StreamingRundiffusion en continu d’événements en temps réel -
Gestion des événements : surveiller la progression de l’agent via
AgentResponseUpdateEventet l’achèvement viaWorkflowOutputEvent -
Approbation de l’outil : Entourez les outils sensibles avec
ApprovalRequiredAIFunctionpour nécessiter une approbation humaine avant l’exécution. -
RequestInfoEvent : émis lorsqu’un outil requiert une approbation ; contient
ToolApprovalRequestContentles détails de l’appel de l’outil
Dans l’orchestration séquentielle, chaque agent traite la tâche à son tour, avec un résultat qui se transfère d’un à l’autre. Commencez par définir des agents pour un processus en deux étapes :
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
writer = chat_client.as_agent(
instructions=(
"You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
),
name="writer",
)
reviewer = chat_client.as_agent(
instructions=(
"You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
),
name="reviewer",
)
Configurer l’orchestration séquentielle
La SequentialBuilder classe crée un pipeline dans lequel les agents traitent les tâches dans l’ordre. Chaque agent voit l’historique complet des conversations et ajoute sa réponse :
from agent_framework.orchestrations import SequentialBuilder
# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()
Exécuter le flux de travail séquentiel
Exécutez le flux de travail et collectez la conversation finale montrant la contribution de chaque agent :
from typing import Any, cast
from agent_framework import Message, WorkflowEvent
# 3) Run and print final conversation
outputs: list[list[Message]] = []
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
if event.type == "output":
outputs.append(cast(list[Message], event.data))
if outputs:
print("===== Final Conversation =====")
messages: list[Message] = outputs[-1]
for i, msg in enumerate(messages, start=1):
name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")
Exemple de sortie
===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!
Avancé : Combinaison d’agents avec des exécuteurs personnalisés
L’orchestration séquentielle prend en charge la combinaison d’agents avec des exécuteurs personnalisés pour un traitement spécifique. Cela est utile lorsque vous avez besoin d’une logique personnalisée qui ne nécessite pas de LLM :
Définir un exécuteur personnalisé
Note
Lorsqu’un exécuteur personnalisé suit un agent dans la séquence, son gestionnaire reçoit un AgentExecutorResponse (car les agents sont encapsulés en interne par AgentExecutor). Permet agent_response.full_conversation d’accéder à l’historique complet des conversations.
from agent_framework import AgentExecutorResponse, Executor, WorkflowContext, handler
from agent_framework import Message
class Summarizer(Executor):
"""Simple summarizer: consumes full conversation and appends an assistant summary."""
@handler
async def summarize(
self,
agent_response: AgentExecutorResponse,
ctx: WorkflowContext[list[Message]]
) -> None:
if not agent_response.full_conversation:
await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
return
users = sum(1 for m in agent_response.full_conversation if m.role == "user")
assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
await ctx.send_message(list(agent_response.full_conversation) + [summary])
Créer un flux de travail séquentiel mixte
# Create a content agent
content = chat_client.as_agent(
instructions="Produce a concise paragraph answering the user's request.",
name="content",
)
# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()
Exemple de sortie avec exécuteur personnalisé
------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1
Orchestration séquentielle avec Human-in-the-Loop
Les orchestrations séquentielles prennent en charge les interactions humaines dans la boucle de deux façons : approbation des outils pour contrôler les appels d’outils sensibles et demander des informations pour la suspension après chaque réponse de l’agent pour recueillir des commentaires.
Conseil / Astuce
Pour plus d’informations sur le modèle de demande et de réponse, consultez Human-in-the-Loop.
Approbation des outils dans les flux de travail séquentiels
Permet @tool(approval_mode="always_require") de marquer les outils qui ont besoin d’une approbation humaine avant l’exécution. Le flux de travail suspend et émet un request_info événement lorsque l’agent tente d’appeler l’outil.
@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
return f"Query executed successfully: {query}"
database_agent = Agent(
client=chat_client,
name="DatabaseAgent",
instructions="You are a database assistant.",
tools=[execute_database_query],
)
workflow = SequentialBuilder(participants=[database_agent]).build()
Traitez le flux d’événements et gérez les demandes d’approbation :
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info" and event.data.type == "function_approval_request":
responses[event.request_id] = event.data.to_function_approval_response(approved=True)
return responses if responses else None
stream = workflow.run("Check the schema and update all pending orders", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Conseil / Astuce
Pour obtenir un exemple exécutable complet, consultez sequential_builder_tool_approval.py. L'approbation de l'outil fonctionne avec SequentialBuilder sans configuration supplémentaire du constructeur.
Demander des informations sur les commentaires de l’agent
Utilisez .with_request_info() pour suspendre après que des agents spécifiques répondent, et permettre l’entrée externe (par exemple, la révision humaine) avant le début de l’agent suivant.
drafter = Agent(
client=chat_client,
name="drafter",
instructions="You are a document drafter. Create a brief draft on the given topic.",
)
editor = Agent(
client=chat_client,
name="editor",
instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)
finalizer = Agent(
client=chat_client,
name="finalizer",
instructions="You are a finalizer. Create a polished final version.",
)
# Enable request info for the editor agent only
workflow = (
SequentialBuilder(participants=[drafter, editor, finalizer])
.with_request_info(agents=["editor"])
.build()
)
async def process_event_stream(stream):
responses = {}
async for event in stream:
if event.type == "request_info":
responses[event.request_id] = AgentRequestInfoResponse.approve()
return responses if responses else None
stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Conseil / Astuce
Consultez les exemples complets : approbation de l’outil séquentiel et informations de demande séquentielle.
Concepts clés
- Contexte partagé : chaque participant reçoit l’historique complet des conversations, y compris tous les messages précédents
-
Ordre important : les agents s’exécutent strictement dans l’ordre spécifié dans la
participantsliste - Participants flexibles : vous pouvez combiner des agents et des exécuteurs personnalisés dans n’importe quel ordre
- Flux de conversation : chaque agent/exécuteur ajoute à la conversation, en créant un dialogue complet
-
Approbation de l’outil : Utilisation
@tool(approval_mode="always_require")pour les opérations sensibles nécessitant une révision humaine -
Demander des informations : Utilisez
.with_request_info(agents=[...])pour mettre en pause après certains agents pour les retours externes