Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Lorsque vous ajoutez un agent IA à un flux de travail, il doit être encapsulé dans un exécuteur afin que le moteur de flux de travail puisse acheminer les messages vers celui-ci, gérer son état de session et gérer sa sortie. L’exécuteur de l’agent est l’exécuteur intégré qui gère cette adaptation.
Aperçu
L’exécuteur de l’agent permet de combler l’écart entre l’abstraction de l’agent et le modèle d’exécution du flux de travail. Elle effectue les actions suivantes :
- Reçoit des messages typés à partir du graphe de flux de travail et les transfère à l’agent sous-jacent.
- Gère la session et l'état de conversation de l'agent au cours des opérations.
- Adapte son comportement en fonction du mode d’exécution du flux de travail (streaming ou non-diffusion en continu).
- Génère des événements de sortie (
AgentResponseouAgentResponseUpdate) à l’appelant de workflow pour observation. - Envoie des messages aux exécuteurs en aval connectés pour un traitement continu dans le graphique.
- Prend en charge le point de contrôle pour les flux de travail de longue durée.
Fonctionnement
En C#, le moteur de flux de travail crée en interne un AIAgentHostExecutor pour chaque AIAgent ajout à un flux de travail. Cet exécuteur spécialisé étend ChatProtocolExecutor et utilise un modèle de turn token :
-
Mise en cache des messages : lorsque les messages arrivent d’autres exécuteurs, l’exécuteur de l’agent les collecte. S’il
ForwardIncomingMessagesest activé (valeur par défaut), les messages entrants sont également transférés aux exécuteurs en aval. -
Déclencheur de jeton : l’agent traite ses messages mis en cache uniquement après la réception d’un
TurnToken. -
Appel de l’agent : l’exécuteur appelle
RunAsync(non-streaming) ouRunStreamingAsync(streaming) sur l’agent sous-jacent. -
Rendement de sortie : si les événements de diffusion en continu sont activés, chaque incrémentiel
AgentResponseUpdateest généré en tant que sortie de flux de travail. SiEmitAgentResponseEventsest activé, l'agrégatAgentResponseest également généré en tant que sortie de flux de travaux. - Messagerie en aval : les messages de réponse de l’agent sont envoyés aux exécuteurs en aval connectés.
-
Passage du jeton en mode pass-through : après avoir terminé son tour, l’exécuteur envoie un nouveau
TurnTokenen aval afin que l’agent suivant de la chaîne puisse commencer le traitement.
Conseil / Astuce
Certains scénarios peuvent nécessiter un exécuteur d’agent plus spécialisé ; par exemple, les orchestrations de transfert utilisent une logique de HandoffAgentExecutor routage personnalisée dédiée.
Création implicite ou explicite
Lorsque vous passez un AIAgent à WorkflowBuilder, l’infrastructure l’encapsule automatiquement dans un AIAgentBinding, ce qui crée le sous-jacent AIAgentHostExecutor. Vous n’avez pas besoin d’instancier directement l’exécuteur de l’agent.
AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;
// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
Vous pouvez également utiliser les méthodes d’assistance sur AgentWorkflowBuilder pour les modèles courants :
// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);
Configuration personnalisée
Pour personnaliser le comportement de l’exécuteur de l’agent, utilisez BindAsExecutor avec AIAgentHostOptions:
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
EmitAgentResponseEvents = true,
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};
ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
.AddEdge(writerBinding, reviewerAgent)
.Build();
Types d’entrée
L’exécuteur de l’agent en C# accepte plusieurs types d’entrée : string, ChatMessageet IEnumerable<ChatMessage>. Les entrées de chaîne sont automatiquement converties en instances ChatMessage ayant le rôle User. Tous les messages entrants sont accumulés jusqu’à ce qu’un TurnToken message soit reçu, auquel point l’exécuteur traite le lot. Lorsqu’il ReassignOtherAgentsAsUsers est activé (valeur par défaut), les messages d’autres agents sont réaffectés au User rôle afin que le modèle sous-jacent les traite comme des entrées utilisateur, tandis que les messages de l’agent actuel conservent le Assistant rôle.
Sortie et chaînage
Une fois que l’agent a terminé son tour, l’exécuteur :
- Envoie les messages de réponse de l’agent à tous les exécuteurs en aval connectés.
- Transfère un nouvel
TurnTokenafin que l’agent suivant de la chaîne puisse commencer le traitement.
Cela rend l'enchaînement des agents simple : il suffit de les connecter par des arêtes.
var workflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
Comportement de diffusion en continu
Le comportement de diffusion en continu est contrôlé par l'option EmitAgentUpdateEvents sur AIAgentHostOptions, ou dynamiquement via TurnToken.
-
Lorsqu’il est activé , l’exécuteur appelle
RunStreamingAsyncl’agent et génère chacunAgentResponseUpdateen tant qu’événement de sortie de flux de travail. Cela fournit des mises à jour de jeton par jeton en temps réel. -
En cas de désactivation : l’exécuteur appelle
RunAsyncet produit une seule réponse complète.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
};
// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Sessions partagées
Chaque exécuteur d’agent gère sa propre session par défaut. Pour partager une session entre les agents, configurez les agents avec un fournisseur de session commun avant de les ajouter au flux de travail.
Options de configuration
AIAgentHostOptions contrôle le comportement de l’exécuteur d'agent :
| Choix | Par défaut | Description |
|---|---|---|
EmitAgentUpdateEvents |
null |
Émettre des événements de mise à jour de streaming au cours de l’exécution.
TurnToken est prioritaire si la valeur est définie. Si les deux sont null, la diffusion en continu est désactivée. |
EmitAgentResponseEvents |
false |
Émettez la réponse de l’agent agrégé en tant qu’événement de sortie de flux de travail. |
InterceptUserInputRequests |
false |
Interceptez et routez-le UserInputRequestContent en tant que message de flux de travail pour traitement. |
InterceptUnterminatedFunctionCalls |
false |
Interceptez FunctionCallContent sans résultat correspondant et routez-le en tant que message de flux de travail. |
ReassignOtherAgentsAsUsers |
true |
Réaffectez les messages d’autres agents au User rôle afin que le modèle les traite comme des entrées utilisateur. |
ForwardIncomingMessages |
true |
Transférez les messages entrants vers les exécuteurs en aval avant les messages générés par l’agent. |
Points de contrôle
L’exécuteur de l’agent prend en charge la gestion des points de sauvegarde pour les processus de longue durée. Lorsqu’un point de contrôle est effectué, l’exécuteur sérialise :
- L'état de session de l'agent (via
SerializeSessionAsync). - La configuration d’émission d’événements du tour actuel (présente uniquement pendant que les requêtes sont en attente et que l’exécuteur n’a pas encore produit son entrant
TurnToken). - Requêtes d'entrée utilisateur en attente et requêtes d'appel de fonction.
Lors de la restauration, l’exécuteur désérialise la session et l’état de la requête en attente, permettant ainsi au flux de travail de reprendre là où il s'est arrêté.
Fonctionnement
La AgentExecutor classe encapsule un agent qui implémente le SupportsAgentRun protocole. Lorsque l’exécuteur reçoit un message :
-
Normalisation des messages : l’entrée est normalisée dans une liste d’objets
Messageet ajoutée au cache interne de l’exécuteur. L’exécuteur accepte plusieurs types d’entrée (str, ,Messagelist[str | Message],AgentExecutorRequestetAgentExecutorResponse) chaque routé vers un gestionnaire dédié qui normalise l’entrée avant la mise en cache. -
Appel de l’agent : l’exécuteur appelle
agent.run()avec les messages mis en cache, en sélectionnant automatiquement le mode streaming ou hors streaming en fonction du mode d’exécution du flux de travail. -
Émission de sortie : en mode streaming, chacun
AgentResponseUpdateest généré en tant qu’événement de sortie de flux de travail. En mode hors streaming, un seulAgentResponseest généré. -
Répartition en aval : une fois l’agent terminé, l’exécuteur envoie un
AgentExecutorResponseà tous les exécuteurs en aval connectés. Cette réponse inclut l’historique complet des conversations, ce qui permet un chaînage transparent. - Réinitialisation du cache : le cache de messages internes de l’exécuteur est effacé après l’appel de l’agent, ce qui garantit que chaque appel d’agent traite uniquement les nouveaux messages reçus depuis la dernière appel.
Conseil / Astuce
Certains scénarios peuvent nécessiter un exécuteur d’agent plus spécialisé ; par exemple, les orchestrations de transfert utilisent un exécuteur dédié avec une logique de routage personnalisée.
Création implicite ou explicite
Les WorkflowBuilder encapsulent automatiquement les agents dans des instances de AgentExecutor lorsque vous transmettez un agent directement. Pour la plupart des flux de travail, la création implicite est suffisante :
from agent_framework import WorkflowBuilder
writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")
# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, reviewer_agent)
.build()
)
Création explicite
Créez une AgentExecutor explicitement lorsque vous devez :
- Partagez une session entre plusieurs agents.
- Fournissez un ID d’exécuteur personnalisé pour le routage et les kwargs d’exécution ciblés.
- Référencez la même instance d’exécuteur dans plusieurs arêtes.
from agent_framework import AgentExecutor
writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")
workflow = (
WorkflowBuilder(start_executor=writer_executor)
.add_edge(writer_executor, reviewer_executor)
.build()
)
Paramètres du constructeur :
| Paramètre | Type | Description |
|---|---|---|
agent |
SupportsAgentRun |
Agent à encapsuler. |
session |
AgentSession \| None |
Session à utiliser pour les exécutions d'agents. Si None, une nouvelle session est créée par l'agent. |
id |
str \| None |
ID d’exécuteur unique. Par défaut, le nom de l'agent est utilisé si disponible. |
Conseil / Astuce
L’ID d’exécuteur est également la clé utilisée lorsque vous ciblez workflow.run(function_invocation_kwargs=...) ou client_kwargs= à des agents individuels. Si vous omettez id, le flux de travail utilise le nom de l’agent encapsulé.
Types d’entrée
Le AgentExecutor définit plusieurs méthodes de gestionnaire, chacune acceptant un type d’entrée différent. Le moteur de flux de travail distribue automatiquement le gestionnaire approprié en fonction du type de message. Tous les types d’entrée déclenchent l’exécution immédiate de l’agent, sauf pour AgentExecutorRequest où l’indicateur should_respond contrôle si l’agent s’exécute ou met simplement en cache les messages :
| Type d’entrée | Gestionnaire | Agent déclencheurs | Description |
|---|---|---|---|
AgentExecutorRequest |
run |
Conditionnelle | Type d’entrée canonique. Contient une liste de messages et un should_respond indicateur qui contrôle si l’agent s’exécute. |
str |
from_str |
Toujours | Accepte une chaîne de texte brute. |
Message |
from_message |
Toujours | Accepte un seul Message objet. |
list[str \| Message] |
from_messages |
Toujours | Accepte une liste de chaînes ou Message d’objets en tant que contexte de conversation. |
AgentExecutorResponse |
from_response |
Toujours | Accepte la réponse d’un exécuteur d’agent antérieur, ce qui active le chaînage direct. |
Utilisation de AgentExecutorRequest
AgentExecutorRequest est le type d’entrée canonique et fournit le plus de contrôle :
from agent_framework import AgentExecutorRequest, Message
# Create a request with messages
request = AgentExecutorRequest(
messages=[Message(role="user", contents=["Hello, world!"])],
should_respond=True,
)
# Run the workflow
result = await workflow.run(request)
L’indicateur should_respond contrôle si l’agent traite immédiatement les messages ou les met simplement en cache ultérieurement :
-
True(valeur par défaut) : l’agent s’exécute et produit une réponse. -
False— les messages sont ajoutés au cache, mais l’agent ne s’exécute pas. Cela est utile pour précharger le contexte de conversation avant de déclencher une réponse.
Sortie et chaînage
Une fois l’agent terminé, l’exécuteur envoie un AgentExecutorResponse en aval. Cette classe de données contient :
| Champ | Type | Description |
|---|---|---|
executor_id |
str |
ID de l’exécuteur qui a produit la réponse. |
agent_response |
AgentResponse |
Réponse de l’agent sous-jacent (inchangée par rapport au client). |
full_conversation |
list[Message] \| None |
Contexte de conversation complet (entrées antérieures + sorties de l’agent) pour l'enchaînement des conversations. |
Lors du chaînage des exécuteurs d’agents, l’exécuteur en aval reçoit le AgentExecutorResponse via le gestionnaire from_response. Il utilise le champ pour conserver l’historique full_conversation complet des conversations, ce qui empêche les agents en aval de perdre le contexte antérieur :
spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())
# The email_assistant receives the spam_detector's full conversation context
workflow = (
WorkflowBuilder(start_executor=spam_detector)
.add_edge(spam_detector, email_assistant)
.build()
)
Comportement de diffusion en continu
AgentExecutor s’adapte automatiquement au mode d’exécution du workflow :
-
stream=True— appelleagent.run(stream=True)et génère chacunAgentResponseUpdateen tant qu’événement de sortie de flux de travail. Une fois la diffusion en continu terminée, les mises à jour sont agrégées dans une version complèteAgentResponsepour envoi en aval. -
stream=False(valeur par défaut) : appelleagent.run(stream=False)et génère un seulAgentResponseévénement de sortie de flux de travail.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
print(event.data.text, end="", flush=True)
# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")
# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, AgentResponse):
print(output.text)
Sessions partagées
Par défaut, chacun AgentExecutor crée sa propre session. Pour partager une session entre plusieurs agents (par exemple, pour gérer un thread de conversation courant), créez une session explicitement et transmettez-la à chaque exécuteur :
from agent_framework import AgentExecutor
# Create a shared session from one agent
shared_session = writer_agent.create_session()
# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)
Note
Tous les agents ne prennent pas en charge les sessions partagées. En règle générale, seuls les agents du même type de fournisseur peuvent partager une session.
Points de contrôle
Le AgentExecutor prend en charge le point de contrôle pour enregistrer et restaurer l'état dans les workflows de longue durée. Lorsqu’un point de contrôle est effectué, l’exécuteur sérialise :
- Cache de messages interne.
- Historique complet des conversations.
- État de session de l’agent.
- Toutes les demandes et réponses d’entrée utilisateur en attente.
Lors de la restauration, l’exécuteur désérialise cet état, ce qui permet au flux de travail de reprendre à partir de l’endroit où il s’est arrêté.
Avertissement
Le point de contrôle avec les agents qui utilisent des sessions côté serveur (par exemple FoundryAgent) présente des limitations. L’état de session côté serveur n’est pas capturé dans les points de contrôle et peut être modifié par les exécutions suivantes. Envisagez d’implémenter un exécuteur personnalisé si vous avez besoin de points de contrôle fiables avec des sessions côté serveur.