Delen via


Agentuitvoerder

Wanneer u een AI-agent aan een werkstroom toevoegt, moet deze worden verpakt in een uitvoerprogramma, zodat de werkstroomengine berichten naar deze agent kan routeren, de sessiestatus kan beheren en de uitvoer kan afhandelen. De Agent Executor is de ingebouwde executor die deze aanpassing afhandelt.

Overzicht

De agentuitvoering overbrugt de kloof tussen de agentabstractie en het werkstroomuitvoeringsmodel. Het:

  • Ontvangt getypte berichten uit de werkstroomgrafiek en stuurt deze door naar de onderliggende agent.
  • Beheert de sessie- en gespreksstatus van de agent tussen uitvoeringen.
  • Past zijn gedrag aan op basis van de uitvoeringsmodus van de workflow (streaming of niet-streaming).
  • Levert uitvoerevenementen (AgentResponse of AgentResponseUpdate) op aan de aanroeper van de workflow ter observatie.
  • Hiermee worden berichten verzonden naar verbonden downstreamexecutors voor continue verwerking in de grafiek.
  • Ondersteunt controlepunten voor langlopende werkstromen.

Hoe het werkt

In C# creëert de werkstroomengine intern een AIAgentHostExecutor voor elke AIAgent die aan een werkstroom is toegevoegd. Deze gespecialiseerde uitvoerder breidt ChatProtocolExecutor uit en maakt gebruik van een turntokenpatroon :

  1. Berichtcaching : wanneer berichten van andere uitvoerders binnenkomen, verzamelt de agentexecutor deze. Als ForwardIncomingMessages deze optie is ingeschakeld (de standaardinstelling), worden de binnenkomende berichten ook doorgestuurd naar downstreamexecutors.
  2. Token-trigger activeren — de agent verwerkt de berichten in de cache pas na ontvangst van een TurnToken.
  3. Aanroep van een agent: de uitvoerder roept RunAsync aan voor niet-streaming of RunStreamingAsync voor streaming op de onderliggende agent.
  4. Oplevering van uitvoer — als streaming gebeurtenissen zijn ingeschakeld, wordt elke incrementele als werkstroomuitvoer geleverd. Als EmitAgentResponseEvents is ingeschakeld, wordt de geaggregeerde AgentResponse ook als werkstroomuitvoer opgeleverd.
  5. Downstream-berichten : de antwoordberichten van de agent worden verzonden naar verbonden downstreamexecutors.
  6. Tokenpassthrough : nadat de uitvoering is voltooid, verzendt de uitvoerder een nieuwe TurnToken downstream, zodat de volgende agent in de keten kan beginnen met verwerken.

Aanbeveling

Voor sommige scenario's is mogelijk een meer gespecialiseerde agentexecutor vereist; Handoff-indelingen maken bijvoorbeeld gebruik van een toegewezen HandoffAgentExecutor met aangepaste routeringslogica.

Impliciet versus expliciet creatie

Wanneer u een AIAgent aan WorkflowBuilder doorgeeft, wikkelt de framework het automatisch in een AIAgentBinding, wat de onderliggende AIAgentHostExecutor creëert. U hoeft de agentexecutor niet rechtstreeks te instantiëren.

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

U kunt ook de helpermethoden AgentWorkflowBuilder gebruiken voor veelvoorkomende patronen:

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

Aangepaste configuratie

Om het gedrag van de agent-executor aan te passen, gebruikt u BindAsExecutor met AIAgentHostOptions:

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

Invoertypen

De agentuitvoering in C# accepteert meerdere invoertypen: string, ChatMessageen IEnumerable<ChatMessage>. Tekenreeksinvoer wordt automatisch omgezet naar ChatMessage-instanties met de rol User. Alle binnenkomende berichten worden verzameld totdat een TurnToken bericht wordt ontvangen, waarna de uitvoerder de batch verwerkt. Wanneer ReassignOtherAgentsAsUsers deze optie is ingeschakeld (de standaardinstelling), worden berichten van andere agents opnieuw toegewezen aan de User rol, zodat het onderliggende model ze behandelt als gebruikersinvoer, terwijl berichten van de huidige agent de Assistant rol behouden.

Uitvoer en keten

Nadat de agent zijn beurt heeft voltooid, voert de uitvoerder het volgende uit:

  1. Hiermee worden de antwoordberichten van de agent verzonden naar alle verbonden downstreamexecutors.
  2. Hiermee wordt een nieuwe TurnToken doorgestuurd, zodat de volgende agent in de keten kan met de verwerking beginnen.

Dit maakt het koppelen van agents eenvoudig: verbind ze eenvoudig met randen:

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

Streaminggedrag

Streaminggedrag wordt bepaald door de EmitAgentUpdateEvents optie op AIAgentHostOptions, of dynamisch via het TurnToken volgende:

  • Wanneer ingeschakeld — roept de uitvoerder RunStreamingAsync aan op de agent en levert elke AgentResponseUpdate als een workflow-uitvoerevenement. Dit biedt realtime token-voor-token-updates.
  • Als dit is uitgeschakeld, roept de uitvoerder aan en produceert een volledige respons.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

Gedeelde sessies

Elke agentexecutor onderhoudt standaard een eigen sessie. Als u een sessie tussen agents wilt delen, configureert u de agents met een algemene sessieprovider voordat u deze toevoegt aan de werkstroom.

Configuratieopties

AIAgentHostOptions bepaalt het gedrag van de agentuitvoerders:

Option Verstek Beschrijving
EmitAgentUpdateEvents null Streaming-update-gebeurtenissen verzenden tijdens de uitvoering. TurnToken heeft voorrang als deze is ingesteld. Als beide zijn null, is streaming uitgeschakeld.
EmitAgentResponseEvents false Het antwoord van de geaggregeerde agent verzenden als een werkstroomuitvoergebeurtenis.
InterceptUserInputRequests false Onderscheppen UserInputRequestContent en routeren als een werkstroombericht voor verwerking.
InterceptUnterminatedFunctionCalls false Onderschep FunctionCallContent zonder een bijbehorend resultaat en routeer het als een workflow-bericht.
ReassignOtherAgentsAsUsers true Wijs berichten van andere agents opnieuw toe aan de User rol, zodat het model ze behandelt als gebruikersinvoer.
ForwardIncomingMessages true Binnenkomende berichten doorsturen naar uitvoerders stroomafwaarts voordat de door de agent gegenereerde berichten worden verzonden.

Controlepunten maken

De agentuitvoering ondersteunt controlepunten voor langlopende werkstromen. Wanneer een controlepunt wordt genomen, serialiseert de uitvoerder het volgende:

  • De sessiestatus van de agent (via SerializeSessionAsync).
  • De configuratie voor gebeurtenisuitgifte van de huidige cyclus (alleen aanwezig terwijl aanvragen in behandeling zijn en de uitvoerder de binnenkomende TurnToken gebeurtenis nog niet heeft opgeleverd).
  • Alle aanvragen voor in behandeling zijnde gebruikersinvoer en aanvragen voor functieoproepen.

Bij het herstellen worden de sessie en de status van de lopende aanvraag gedeserialiseerd, zodat de workflow kan worden hervat vanaf het punt waar het was gebleven.

Hoe het werkt

De AgentExecutor klasse verpakt een agent die het SupportsAgentRun protocol implementeert. Wanneer de uitvoerder een bericht ontvangt:

  1. Berichtnormalisatie : de invoer wordt genormaliseerd in een lijst Message met objecten en toegevoegd aan de interne cache van de uitvoerder. De uitvoerder accepteert meerdere invoertypen – str, Message, list[str | Message], AgentExecutorRequest en AgentExecutorResponse – die worden gerouteerd naar een toegewezen handler die de invoer normaliseert voordat deze wordt gecachet.
  2. Aanroep van agent: de uitvoerder roept agent.run() de berichten in de cache aan, waarbij automatisch streaming- of niet-streamingmodus wordt geselecteerd op basis van de uitvoeringsmodus van de werkstroom.
  3. Uitvoeruitgifte : in de streamingmodus wordt elk AgentResponseUpdate gegenereerd als een werkstroomuitvoergebeurtenis. In de niet-streaming-modus wordt er één AgentResponse opgeleverd.
  4. Downstream-verzending — nadat de agent klaar is, stuurt de uitvoerder een AgentExecutorResponse naar alle verbonden downstreamexecutors. Dit antwoord omvat de volledige gespreksgeschiedenis, waardoor naadloze ketening mogelijk is.
  5. Cacheherstel : de interne berichtcache van de uitvoerder wordt gewist nadat de agent is aangeroepen, zodat elke agentaanroep alleen nieuwe berichten verwerkt die zijn ontvangen sinds de laatste aanroep.

Aanbeveling

Voor sommige scenario's is mogelijk een meer gespecialiseerde agentexecutor vereist; Handoff-indelingen maken bijvoorbeeld gebruik van een toegewezen uitvoerder met aangepaste routeringslogica.

Impliciet versus expliciet creatie

Wanneer je direct een agent doorgeeft, worden de WorkflowBuilder-agents automatisch verpakt in AgentExecutor-instanties. Voor de meeste werkstromen is impliciet maken voldoende:

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

Expliciete creatie

AgentExecutor Maak een expliciete aanvraag wanneer u het volgende moet doen:

  • Deel een sessie tussen meerdere agents.
  • Geef een aangepaste uitvoerings-ID op voor routering en gerichte runtime kwargs.
  • Verwijs naar hetzelfde uitvoerderexemplaar in meerdere verbindingen.
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

Constructor parameters:

Kenmerk Typ Beschrijving
agent SupportsAgentRun De agent die moet worden verpakt.
session AgentSession \| None Sessie die moet worden gebruikt voor agent-runs. Als er None een nieuwe sessie wordt gemaakt door de agent.
id str \| None Unieke uitvoerder-id. Standaard ingesteld op de naam van de agent, indien beschikbaar.

Aanbeveling

De uitvoerders-id is ook de sleutel gebruikt voor het richten op workflow.run(function_invocation_kwargs=...) of client_kwargs= bij afzonderlijke agenten. Als u weglaat id, gebruikt de werkstroom de naam van de verpakte agent.

Invoertypen

Hiermee AgentExecutor worden meerdere handlermethoden gedefinieerd, die elk een ander invoertype accepteren. De werkstroomengine verzendt automatisch de juiste handler op basis van het berichttype. Alle invoertypen activeren de agent om onmiddellijk te worden uitgevoerd, behalve AgentExecutorRequest waar de should_respond vlag bepaalt of de agent wordt uitgevoerd of dat de berichten alleen in de cache worden opgeslagen.

Invoertype Behandelaar Triggers Agent Beschrijving
AgentExecutorRequest run Conditional Het canonieke invoertype. Bevat een lijst met berichten en een should_respond vlag die bepaalt of de agent actief is.
str from_str Altijd Accepteert een onbewerkte tekenreeksprompt.
Message from_message Altijd Accepteert één Message object.
list[str \| Message] from_messages Altijd Accepteert een lijst met tekenreeksen of Message objecten als gesprekscontext.
AgentExecutorResponse from_response Altijd Accepteert het antwoord van een eerdere agentexecutor, waardoor directe ketening mogelijk is.

AgentExecutorRequest gebruiken

AgentExecutorRequest is het canonieke invoertype en biedt de meeste controle:

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message(role="user", contents=["Hello, world!"])],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

Met should_respond de vlag bepaalt u of de agent de berichten onmiddellijk verwerkt of gewoon in de cache opgeslagen voor later gebruik:

  • True (standaard): de agent draait en produceert een antwoord.
  • False — de berichten worden toegevoegd aan de cache, maar de agent wordt niet uitgevoerd. Dit is handig voor het vooraf laden van gesprekscontext voordat u een antwoord activeert.

Uitvoer en keten

Nadat de agent is voltooid, verzendt de uitvoerder een AgentExecutorResponse downstream. Deze dataclass bevat:

Veld Typ Beschrijving
executor_id str De ID van de uitvoerder dat het antwoord heeft geproduceerd.
agent_response AgentResponse Het onderliggende agentantwoord (ongewijzigd van de cliënt).
full_conversation list[Message] \| None De volledige gesprekscontext (eerdere invoer en agentuitvoer) voor het koppelen.

Bij het koppelen van agentexecutors ontvangt de downstreamexecutor de AgentExecutorResponse via de from_response handler. Het veld gebruikt het full_conversation veld om de volledige gespreksgeschiedenis te behouden, waardoor downstreamagents geen eerdere context verliezen:

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

Streaminggedrag

De AgentExecutor past zich automatisch aan de uitvoeringsmodus van de werkstroom aan.

  • stream=True — roept agent.run(stream=True) op en levert elk AgentResponseUpdate resultaat op als gebeurtenis voor de werkstroomuitvoer. Nadat het streamen is voltooid, worden de updates samengevoegd tot een volledige AgentResponse voor downstream-verzending.
  • stream=False (standaard): roept agent.run(stream=False) aan en levert één AgentResponse als een workflow-uitvoergebeurtenis.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

Gedeelde sessies

Standaard creëert elke AgentExecutor zijn eigen sessie. Als u een sessie tussen meerdere agents wilt delen (bijvoorbeeld om een algemene gespreksthread te onderhouden), maakt u expliciet een sessie en geeft u deze door aan elke uitvoerder:

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

Opmerking

Niet alle agents ondersteunen gedeelde sessies. Normaal gesproken kunnen alleen agents van hetzelfde providertype een sessie delen.

Controlepunten maken

De AgentExecutor ondersteunt controlepunten voor het opslaan en herstellen van de status in langlopende werkstromen. Wanneer een controlepunt wordt genomen, serialiseert de uitvoerder het volgende:

  • De interne berichtencache.
  • De volledige gespreksgeschiedenis.
  • De sessiestatus van de agent.
  • Alle aanvragen en antwoorden voor gebruikersinvoer die in behandeling zijn.

Bij herstel wordt deze toestand door de uitvoerder gedeserialiseerd, waardoor de werkstroom kan worden hervat vanaf waar deze was gebleven.

Waarschuwing

Controlepunten met agents die server-side sessies gebruiken (zoals FoundryAgent) kennen beperkingen. Sessiestatus aan de serverzijde wordt niet vastgelegd in controlepunten en kan worden gewijzigd door volgende uitvoeringen. Overweeg om een aangepaste uitvoerder te implementeren als u betrouwbare controlepunten nodig hebt met sessies aan de serverzijde.

Volgende stappen