Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
När du lägger till en AI-agent i ett arbetsflöde måste den omslutas i en köre så att arbetsflödesmotorn kan dirigera meddelanden till den, hantera dess sessionstillstånd och hantera dess utdata. Agentutförare är den inbyggda utföraren som hanterar den här anpassningen.
Översikt
Agentexekutor överbryggar klyftan mellan agentabstraktionen och arbetsflödets körningsmodell. Det:
- Tar emot inskrivna meddelanden från arbetsflödesdiagrammet och vidarebefordrar dem till den underliggande agenten.
- Hanterar agentens session och samtalstillstånd mellan körningar.
- Anpassar sitt beteende baserat på arbetsflödets körningsläge (direktuppspelning eller icke-direktuppspelning).
- Ger utdatahändelser (
AgentResponseellerAgentResponseUpdate) till arbetsflödesanroparen för observation. - Skickar meddelanden till anslutna underordnade utförare för fortsatt bearbetning i diagrammet.
- Stöder kontrollpunkter för långvariga arbetsflöden.
Så här fungerar det
I C# skapar arbetsflödesmotorn internt en AIAgentHostExecutor för varje AIAgent som läggs till i ett arbetsflöde. Den här specialiserade utföraren utökar ChatProtocolExecutor och använder ett vände-token-mönster:
-
Meddelandecaching - när meddelanden kommer från andra utförare samlar agentutföraren in dem. Om
ForwardIncomingMessagesär aktiverat (standard) vidarebefordras även inkommande meddelanden till underordnade utförare. -
Aktivera tokenutlösare – agenten bearbetar endast cachelagrade meddelanden efter att ha tagit emot en
TurnToken. -
Agentanrop – exekutoranrop
RunAsync(icke-direktuppspelning) ellerRunStreamingAsync(direktuppspelning) på den underliggande agenten. -
Utdata generering – om strömmande händelser aktiveras ges varje inkrementell
AgentResponseUpdatesom ett arbetsflödesutdata. OmEmitAgentResponseEventsär aktiverat genereras även aggregeradeAgentResponsesom ett arbetsflödesutdata. - Nedströmsmeddelanden – agentens svarsmeddelanden skickas till anslutna underordnade utförare.
-
Aktivera genomströmning av token – när du har slutfört sin tur skickar kören en ny
TurnTokennedströms så att nästa agent i kedjan kan börja bearbetas.
Tips/Råd
Vissa scenarier kan kräva en mer specialiserad agentexekutor. Till exempel använder överlämningsorkestreringar en dedikerad HandoffAgentExecutor med anpassad routningslogik.
Implicit kontra explicit skapande
När du skickar en AIAgent till WorkflowBuilder, omsluter ramverket automatiskt det i en AIAgentBinding, vilket skapar den underliggande AIAgentHostExecutor. Du behöver inte instansiera agentexekutor direkt.
AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;
// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
Du kan också använda hjälpmetoderna på AgentWorkflowBuilder för vanliga mönster:
// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);
Anpassad konfiguration
Om du vill anpassa hur agentexekutor fungerar använder du BindAsExecutor med AIAgentHostOptions:
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
EmitAgentResponseEvents = true,
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};
ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
.AddEdge(writerBinding, reviewerAgent)
.Build();
Indatatyper
Agentexekutor i C# accepterar flera indatatyper: string, ChatMessageoch IEnumerable<ChatMessage>. Strängindata konverteras automatiskt till ChatMessage instanser med User rollen. Alla inkommande meddelanden ackumuleras tills en TurnToken tas emot, varefter exekutorn bearbetar batchen. När ReassignOtherAgentsAsUsers är aktiverat (standard), tilldelas meddelanden från andra agenter till User-rollen så att den underliggande modellen behandlar dem som användarindata, medan meddelanden från den aktuella agenten behåller Assistant-rollen.
Utdata och länkning
När agenten har slutfört sin tur exekveraren:
- Skickar agentens svarsmeddelanden till alla anslutna underordnade utförare.
- Vidarebefordrar en ny
TurnTokenså att nästa agent i kedjan kan börja bearbeta den.
Detta gör länkningsagenter enkla – anslut dem helt enkelt med kanter:
var workflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
Beteende för direktuppspelning
Strömningsbeteendet styrs av EmitAgentUpdateEvents alternativet på AIAgentHostOptions, eller dynamiskt via TurnToken:
-
När den är aktiverad anropar exekutorn
RunStreamingAsyncpå agenten och returnerar varjeAgentResponseUpdatesom en händelse i arbetsflödets utdata. Detta ger realtidsuppdateringar för varje token. -
När den är inaktiverad anropar utföraren
RunAsyncoch genererar ett enda fullständigt svar.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
};
// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Delade sessioner
Varje agentexekutor underhåller sin egen session som standard. Om du vill dela en session mellan agenter konfigurerar du agenterna med en gemensam sessionsprovider innan du lägger till dem i arbetsflödet.
Konfigurationsalternativ
AIAgentHostOptions styr agentexekutorns beteende:
| Option | Standardinställning | Beskrivning |
|---|---|---|
EmitAgentUpdateEvents |
null |
Skicka ut strömmande uppdateringshändelser under utförandet.
TurnToken har företräde om det anges. Om båda är nullinaktiveras direktuppspelningen. |
EmitAgentResponseEvents |
false |
Generera det aggregerade agentsvaret som en utdatahändelse för arbetsflödet. |
InterceptUserInputRequests |
false |
Fånga upp UserInputRequestContent och skicka det vidare som ett arbetsflödesmeddelande för hantering. |
InterceptUnterminatedFunctionCalls |
false |
Fånga upp FunctionCallContent där det saknas ett motsvarande resultat och skicka som ett arbetsflödesmeddelande. |
ReassignOtherAgentsAsUsers |
true |
Omtilldela meddelanden från andra agenter till User-rollen så att modellen behandlar dem som användarens indata. |
ForwardIncomingMessages |
true |
Vidarebefordra inkommande meddelanden till underordnade utförare före agentens genererade meddelanden. |
Kontrollpunkter
Agentexekutor har stöd för kontrollpunkter för långvariga arbetsflöden. När en kontrollpunkt tas serialiserar utföraren:
- Agentens sessionsstatus (via
SerializeSessionAsync). - Den aktuella svängens händelseutsläppskonfiguration (endast närvarande medan begäranden väntar och utföraren ännu inte har gett sin inkommande
TurnToken). - Väntande användarinmatningsförfrågningar och begäranden om funktionsanrop.
Vid återställning deserialiserar exekveraren sessionen och förfrågningens pågående tillstånd, vilket gör att arbetsflödet kan återupptas där det avslutades.
Så här fungerar det
Klassen AgentExecutor omsluter en agent som implementerar SupportsAgentRun protokollet. När exekutorn tar emot ett meddelande:
-
Meddelandenormalisering – indata normaliseras till en lista över
Messageobjekt och läggs till i den interna cachen för den körbara filen. Kören accepterar flera indatatyper –str,Message,list[str | Message],AgentExecutorRequestochAgentExecutorResponse– som varje dirigeras till en dedikerad hanterare som normaliserar indata före cachelagring. -
Agentanrop – kören anropar
agent.run()med cachelagrade meddelanden och väljer automatiskt direktuppspelnings- eller icke-direktuppspelningsläge baserat på arbetsflödets körningsläge. -
Utdataemission – i strömningsläge genereras varje
AgentResponseUpdatesom en arbetsflödeshändelse. I icke-direktuppspelningsläge returneras en endaAgentResponse. -
Nedströmssändning – när agenten har slutfört, skickar exekutorn en
AgentExecutorResponsetill alla anslutna nedströms exekutorer. Det här svaret innehåller den fullständiga konversationshistoriken, vilket möjliggör sömlös länkning. - Cacheåterställning – exekutörens interna meddelandecache rensas när agentens anrop görs, vilket säkerställer att varje anrop av agenten endast bearbetar nya meddelanden som tagits emot sedan det senaste anropet.
Tips/Råd
Vissa scenarier kan kräva en mer specialiserad agentutförare; till exempel använder överlämningsorkestreringar en dedicerad utförare med skräddarsydd logik för vidarebefordring.
Implicit kontra explicit skapande
WorkflowBuilder paketerar automatiskt agenten i AgentExecutor instanser när du direkt skickar en agent. För de flesta arbetsflöden räcker det med implicit skapande:
from agent_framework import WorkflowBuilder
writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")
# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, reviewer_agent)
.build()
)
Explicit skapande
Skapa en AgentExecutor explicit när du behöver:
- Dela en session mellan flera agenter.
- Ange ett anpassat utförar-ID för routing och anpassade körningsparametrar.
- Referera till samma exekveringsinstans i flera grafkanter.
from agent_framework import AgentExecutor
writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")
workflow = (
WorkflowBuilder(start_executor=writer_executor)
.add_edge(writer_executor, reviewer_executor)
.build()
)
Konstruktorparametrar:
| Parameter | Type | Beskrivning |
|---|---|---|
agent |
SupportsAgentRun |
Agenten som ska omslutas. |
session |
AgentSession \| None |
Session som ska användas vid agentkörningar. Om None, skapas en ny session av agenten. |
id |
str \| None |
Unikt utförar-ID. Standardinställningen är agentens namn om det är tillgängligt. |
Tips/Råd
Kör-ID:t är också den nyckel som används när du riktar workflow.run(function_invocation_kwargs=...) eller client_kwargs= mot enskilda agenter. Om du utelämnar idanvänder arbetsflödet den omslutna agentens namn.
Indatatyper
Definierar AgentExecutor flera hanteringsmetoder, var och en accepterar en annan indatatyp. Arbetsflödesmotorn skickar automatiskt rätt hanterare baserat på meddelandetypen. Alla indatatyper utlöser att agenten körs omedelbart, förutom AgentExecutorRequest där should_respond flaggan styr om agenten körs eller bara cachelagrar meddelandena:
| Indatatyp | Hanterare | Utlösaragent | Beskrivning |
|---|---|---|---|
AgentExecutorRequest |
run |
Villkorlig | Den kanoniska indatatypen. Innehåller en lista över meddelanden och en should_respond flagga som styr om agenten körs. |
str |
from_str |
Alltid | Accepterar en raw-strängprompt. |
Message |
from_message |
Alltid | Accepterar ett enskilt Message objekt. |
list[str \| Message] |
from_messages |
Alltid | Accepterar en lista med strängar eller Message objekt som konversationskontext. |
AgentExecutorResponse |
from_response |
Alltid | Accepterar en tidigare agentexekutors svar, vilket möjliggör direkt länkning. |
Användning av AgentExecutorRequest
AgentExecutorRequest är den kanoniska indatatypen och ger mest kontroll:
from agent_framework import AgentExecutorRequest, Message
# Create a request with messages
request = AgentExecutorRequest(
messages=[Message(role="user", contents=["Hello, world!"])],
should_respond=True,
)
# Run the workflow
result = await workflow.run(request)
Flaggan should_respond styr om agenten bearbetar meddelandena omedelbart eller bara cachelagrar dem för senare:
-
True(standard) – agenten kör och genererar ett svar. -
False— Meddelandena läggs till i cacheminnet men agenten körs inte. Detta är användbart för att förinläsa konversationskontexten innan du utlöser ett svar.
Utdata och länkning
När agenten är klar skickar utföraren en AgentExecutorResponse nedströms. Den här dataklassen innehåller:
| Fält | Type | Beskrivning |
|---|---|---|
executor_id |
str |
ID:t för den exekutor som genererade svaret. |
agent_response |
AgentResponse |
Det underliggande agentsvaret (oförändrat från klienten). |
full_conversation |
list[Message] \| None |
Den fullständiga konversationskontexten (tidigare indata + agentutdata) för länkning. |
Vid kedjekoppling av agentutförare tar den underordnade utföraren emot AgentExecutorResponse via hanteraren from_response. Det använder fältet full_conversation för att bevara den fullständiga konversationshistoriken, vilket förhindrar att underordnade agenter förlorar tidigare kontext:
spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())
# The email_assistant receives the spam_detector's full conversation context
workflow = (
WorkflowBuilder(start_executor=spam_detector)
.add_edge(spam_detector, email_assistant)
.build()
)
Beteende för direktuppspelning
AgentExecutor anpassas automatiskt till arbetsflödets körningsläge.
-
stream=True– anroparagent.run(stream=True)och lämnar var och enAgentResponseUpdatesom en arbetsflödesevenemang. När streaming är klar aggregeras uppdateringarna till en fullständigAgentResponseför vidaresändning. -
stream=False(standard) – anroparagent.run(stream=False)och ger en endaAgentResponsesom en arbetsflödesutdatahändelse.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
print(event.data.text, end="", flush=True)
# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")
# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, AgentResponse):
print(output.text)
Delade sessioner
Som standard skapar var och AgentExecutor en sin egen session. Om du vill dela en session mellan flera agenter (till exempel för att underhålla en gemensam konversationstråd) skapar du en session explicit och skickar den till varje köre:
from agent_framework import AgentExecutor
# Create a shared session from one agent
shared_session = writer_agent.create_session()
# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)
Anmärkning
Alla agenter stöder inte delade sessioner. Vanligtvis kan endast agenter av samma providertyp dela en session.
Kontrollpunkter
AgentExecutor stöder kontrollpunkter för att spara och återställa tillstånd i långvariga arbetsflöden. När en kontrollpunkt tas serialiserar utföraren:
- Den interna meddelandecachen.
- Hela konversationshistoriken.
- Agentens sessionstillstånd.
- Pågående förfrågningar och svar för användarens inmatningar.
Vid återställning av systemet deserialiserar utföraren det här tillståndet så att arbetsflödet kan fortsätta från den aktuella platsen.
Varning
Kontrollpunkter med agenter som använder sessioner på serversidan (till exempel FoundryAgent) har begränsningar. Sessionstillståndet på serversidan samlas inte in i kontrollpunkter och kan ändras av efterföljande körningar. Överväg att implementera en anpassad exekutor om du behöver tillförlitlig kontrollpunktering med sessioner på serversidan.