Partager via


Exécuteur d'agent

Lorsque vous ajoutez un agent IA à un flux de travail, il doit être encapsulé dans un exécuteur afin que le moteur de flux de travail puisse acheminer les messages vers celui-ci, gérer son état de session et gérer sa sortie. L’exécuteur de l’agent est l’exécuteur intégré qui gère cette adaptation.

Aperçu

L’exécuteur de l’agent permet de combler l’écart entre l’abstraction de l’agent et le modèle d’exécution du flux de travail. Elle effectue les actions suivantes :

  • Reçoit des messages typés à partir du graphe de flux de travail et les transfère à l’agent sous-jacent.
  • Gère la session et l'état de conversation de l'agent au cours des opérations.
  • Adapte son comportement en fonction du mode d’exécution du flux de travail (streaming ou non-diffusion en continu).
  • Génère des événements de sortie (AgentResponse ou AgentResponseUpdate) à l’appelant de workflow pour observation.
  • Envoie des messages aux exécuteurs en aval connectés pour un traitement continu dans le graphique.
  • Prend en charge le point de contrôle pour les flux de travail de longue durée.

Fonctionnement

En C#, le moteur de flux de travail crée en interne un AIAgentHostExecutor pour chaque AIAgent ajout à un flux de travail. Cet exécuteur spécialisé étend ChatProtocolExecutor et utilise un modèle de turn token :

  1. Mise en cache des messages : lorsque les messages arrivent d’autres exécuteurs, l’exécuteur de l’agent les collecte. S’il ForwardIncomingMessages est activé (valeur par défaut), les messages entrants sont également transférés aux exécuteurs en aval.
  2. Déclencheur de jeton : l’agent traite ses messages mis en cache uniquement après la réception d’un TurnToken.
  3. Appel de l’agent : l’exécuteur appelle RunAsync (non-streaming) ou RunStreamingAsync (streaming) sur l’agent sous-jacent.
  4. Rendement de sortie : si les événements de diffusion en continu sont activés, chaque incrémentiel AgentResponseUpdate est généré en tant que sortie de flux de travail. Si EmitAgentResponseEvents est activé, l'agrégat AgentResponse est également généré en tant que sortie de flux de travaux.
  5. Messagerie en aval : les messages de réponse de l’agent sont envoyés aux exécuteurs en aval connectés.
  6. Passage du jeton en mode pass-through : après avoir terminé son tour, l’exécuteur envoie un nouveau TurnToken en aval afin que l’agent suivant de la chaîne puisse commencer le traitement.

Conseil / Astuce

Certains scénarios peuvent nécessiter un exécuteur d’agent plus spécialisé ; par exemple, les orchestrations de transfert utilisent une logique de HandoffAgentExecutor routage personnalisée dédiée.

Création implicite ou explicite

Lorsque vous passez un AIAgent à WorkflowBuilder, l’infrastructure l’encapsule automatiquement dans un AIAgentBinding, ce qui crée le sous-jacent AIAgentHostExecutor. Vous n’avez pas besoin d’instancier directement l’exécuteur de l’agent.

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

Vous pouvez également utiliser les méthodes d’assistance sur AgentWorkflowBuilder pour les modèles courants :

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

Configuration personnalisée

Pour personnaliser le comportement de l’exécuteur de l’agent, utilisez BindAsExecutor avec AIAgentHostOptions:

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

Types d’entrée

L’exécuteur de l’agent en C# accepte plusieurs types d’entrée : string, ChatMessageet IEnumerable<ChatMessage>. Les entrées de chaîne sont automatiquement converties en instances ChatMessage ayant le rôle User. Tous les messages entrants sont accumulés jusqu’à ce qu’un TurnToken message soit reçu, auquel point l’exécuteur traite le lot. Lorsqu’il ReassignOtherAgentsAsUsers est activé (valeur par défaut), les messages d’autres agents sont réaffectés au User rôle afin que le modèle sous-jacent les traite comme des entrées utilisateur, tandis que les messages de l’agent actuel conservent le Assistant rôle.

Sortie et chaînage

Une fois que l’agent a terminé son tour, l’exécuteur :

  1. Envoie les messages de réponse de l’agent à tous les exécuteurs en aval connectés.
  2. Transfère un nouvel TurnToken afin que l’agent suivant de la chaîne puisse commencer le traitement.

Cela rend l'enchaînement des agents simple : il suffit de les connecter par des arêtes.

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

Comportement de diffusion en continu

Le comportement de diffusion en continu est contrôlé par l'option EmitAgentUpdateEvents sur AIAgentHostOptions, ou dynamiquement via TurnToken.

  • Lorsqu’il est activé , l’exécuteur appelle RunStreamingAsync l’agent et génère chacun AgentResponseUpdate en tant qu’événement de sortie de flux de travail. Cela fournit des mises à jour de jeton par jeton en temps réel.
  • En cas de désactivation : l’exécuteur appelle RunAsync et produit une seule réponse complète.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

Sessions partagées

Chaque exécuteur d’agent gère sa propre session par défaut. Pour partager une session entre les agents, configurez les agents avec un fournisseur de session commun avant de les ajouter au flux de travail.

Options de configuration

AIAgentHostOptions contrôle le comportement de l’exécuteur d'agent :

Choix Par défaut Description
EmitAgentUpdateEvents null Émettre des événements de mise à jour de streaming au cours de l’exécution. TurnToken est prioritaire si la valeur est définie. Si les deux sont null, la diffusion en continu est désactivée.
EmitAgentResponseEvents false Émettez la réponse de l’agent agrégé en tant qu’événement de sortie de flux de travail.
InterceptUserInputRequests false Interceptez et routez-le UserInputRequestContent en tant que message de flux de travail pour traitement.
InterceptUnterminatedFunctionCalls false Interceptez FunctionCallContent sans résultat correspondant et routez-le en tant que message de flux de travail.
ReassignOtherAgentsAsUsers true Réaffectez les messages d’autres agents au User rôle afin que le modèle les traite comme des entrées utilisateur.
ForwardIncomingMessages true Transférez les messages entrants vers les exécuteurs en aval avant les messages générés par l’agent.

Points de contrôle

L’exécuteur de l’agent prend en charge la gestion des points de sauvegarde pour les processus de longue durée. Lorsqu’un point de contrôle est effectué, l’exécuteur sérialise :

  • L'état de session de l'agent (via SerializeSessionAsync).
  • La configuration d’émission d’événements du tour actuel (présente uniquement pendant que les requêtes sont en attente et que l’exécuteur n’a pas encore produit son entrant TurnToken).
  • Requêtes d'entrée utilisateur en attente et requêtes d'appel de fonction.

Lors de la restauration, l’exécuteur désérialise la session et l’état de la requête en attente, permettant ainsi au flux de travail de reprendre là où il s'est arrêté.

Fonctionnement

La AgentExecutor classe encapsule un agent qui implémente le SupportsAgentRun protocole. Lorsque l’exécuteur reçoit un message :

  1. Normalisation des messages : l’entrée est normalisée dans une liste d’objets Message et ajoutée au cache interne de l’exécuteur. L’exécuteur accepte plusieurs types d’entrée ( str, , Messagelist[str | Message], AgentExecutorRequestet AgentExecutorResponse ) chaque routé vers un gestionnaire dédié qui normalise l’entrée avant la mise en cache.
  2. Appel de l’agent : l’exécuteur appelle agent.run() avec les messages mis en cache, en sélectionnant automatiquement le mode streaming ou hors streaming en fonction du mode d’exécution du flux de travail.
  3. Émission de sortie : en mode streaming, chacun AgentResponseUpdate est généré en tant qu’événement de sortie de flux de travail. En mode hors streaming, un seul AgentResponse est généré.
  4. Répartition en aval : une fois l’agent terminé, l’exécuteur envoie un AgentExecutorResponse à tous les exécuteurs en aval connectés. Cette réponse inclut l’historique complet des conversations, ce qui permet un chaînage transparent.
  5. Réinitialisation du cache : le cache de messages internes de l’exécuteur est effacé après l’appel de l’agent, ce qui garantit que chaque appel d’agent traite uniquement les nouveaux messages reçus depuis la dernière appel.

Conseil / Astuce

Certains scénarios peuvent nécessiter un exécuteur d’agent plus spécialisé ; par exemple, les orchestrations de transfert utilisent un exécuteur dédié avec une logique de routage personnalisée.

Création implicite ou explicite

Les WorkflowBuilder encapsulent automatiquement les agents dans des instances de AgentExecutor lorsque vous transmettez un agent directement. Pour la plupart des flux de travail, la création implicite est suffisante :

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

Création explicite

Créez une AgentExecutor explicitement lorsque vous devez :

  • Partagez une session entre plusieurs agents.
  • Fournissez un ID d’exécuteur personnalisé pour le routage et les kwargs d’exécution ciblés.
  • Référencez la même instance d’exécuteur dans plusieurs arêtes.
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

Paramètres du constructeur :

Paramètre Type Description
agent SupportsAgentRun Agent à encapsuler.
session AgentSession \| None Session à utiliser pour les exécutions d'agents. Si None, une nouvelle session est créée par l'agent.
id str \| None ID d’exécuteur unique. Par défaut, le nom de l'agent est utilisé si disponible.

Conseil / Astuce

L’ID d’exécuteur est également la clé utilisée lorsque vous ciblez workflow.run(function_invocation_kwargs=...) ou client_kwargs= à des agents individuels. Si vous omettez id, le flux de travail utilise le nom de l’agent encapsulé.

Types d’entrée

Le AgentExecutor définit plusieurs méthodes de gestionnaire, chacune acceptant un type d’entrée différent. Le moteur de flux de travail distribue automatiquement le gestionnaire approprié en fonction du type de message. Tous les types d’entrée déclenchent l’exécution immédiate de l’agent, sauf pour AgentExecutorRequest où l’indicateur should_respond contrôle si l’agent s’exécute ou met simplement en cache les messages :

Type d’entrée Gestionnaire Agent déclencheurs Description
AgentExecutorRequest run Conditionnelle Type d’entrée canonique. Contient une liste de messages et un should_respond indicateur qui contrôle si l’agent s’exécute.
str from_str Toujours Accepte une chaîne de texte brute.
Message from_message Toujours Accepte un seul Message objet.
list[str \| Message] from_messages Toujours Accepte une liste de chaînes ou Message d’objets en tant que contexte de conversation.
AgentExecutorResponse from_response Toujours Accepte la réponse d’un exécuteur d’agent antérieur, ce qui active le chaînage direct.

Utilisation de AgentExecutorRequest

AgentExecutorRequest est le type d’entrée canonique et fournit le plus de contrôle :

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message(role="user", contents=["Hello, world!"])],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

L’indicateur should_respond contrôle si l’agent traite immédiatement les messages ou les met simplement en cache ultérieurement :

  • True (valeur par défaut) : l’agent s’exécute et produit une réponse.
  • False — les messages sont ajoutés au cache, mais l’agent ne s’exécute pas. Cela est utile pour précharger le contexte de conversation avant de déclencher une réponse.

Sortie et chaînage

Une fois l’agent terminé, l’exécuteur envoie un AgentExecutorResponse en aval. Cette classe de données contient :

Champ Type Description
executor_id str ID de l’exécuteur qui a produit la réponse.
agent_response AgentResponse Réponse de l’agent sous-jacent (inchangée par rapport au client).
full_conversation list[Message] \| None Contexte de conversation complet (entrées antérieures + sorties de l’agent) pour l'enchaînement des conversations.

Lors du chaînage des exécuteurs d’agents, l’exécuteur en aval reçoit le AgentExecutorResponse via le gestionnaire from_response. Il utilise le champ pour conserver l’historique full_conversation complet des conversations, ce qui empêche les agents en aval de perdre le contexte antérieur :

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

Comportement de diffusion en continu

AgentExecutor s’adapte automatiquement au mode d’exécution du workflow :

  • stream=True — appelle agent.run(stream=True) et génère chacun AgentResponseUpdate en tant qu’événement de sortie de flux de travail. Une fois la diffusion en continu terminée, les mises à jour sont agrégées dans une version complète AgentResponse pour envoi en aval.
  • stream=False (valeur par défaut) : appelle agent.run(stream=False) et génère un seul AgentResponse événement de sortie de flux de travail.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

Sessions partagées

Par défaut, chacun AgentExecutor crée sa propre session. Pour partager une session entre plusieurs agents (par exemple, pour gérer un thread de conversation courant), créez une session explicitement et transmettez-la à chaque exécuteur :

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

Note

Tous les agents ne prennent pas en charge les sessions partagées. En règle générale, seuls les agents du même type de fournisseur peuvent partager une session.

Points de contrôle

Le AgentExecutor prend en charge le point de contrôle pour enregistrer et restaurer l'état dans les workflows de longue durée. Lorsqu’un point de contrôle est effectué, l’exécuteur sérialise :

  • Cache de messages interne.
  • Historique complet des conversations.
  • État de session de l’agent.
  • Toutes les demandes et réponses d’entrée utilisateur en attente.

Lors de la restauration, l’exécuteur désérialise cet état, ce qui permet au flux de travail de reprendre à partir de l’endroit où il s’est arrêté.

Avertissement

Le point de contrôle avec les agents qui utilisent des sessions côté serveur (par exemple FoundryAgent) présente des limitations. L’état de session côté serveur n’est pas capturé dans les points de contrôle et peut être modifié par les exécutions suivantes. Envisagez d’implémenter un exécuteur personnalisé si vous avez besoin de points de contrôle fiables avec des sessions côté serveur.

Étapes suivantes