Dela via


Agentexekutor

När du lägger till en AI-agent i ett arbetsflöde måste den omslutas i en köre så att arbetsflödesmotorn kan dirigera meddelanden till den, hantera dess sessionstillstånd och hantera dess utdata. Agentutförare är den inbyggda utföraren som hanterar den här anpassningen.

Översikt

Agentexekutor överbryggar klyftan mellan agentabstraktionen och arbetsflödets körningsmodell. Det:

  • Tar emot inskrivna meddelanden från arbetsflödesdiagrammet och vidarebefordrar dem till den underliggande agenten.
  • Hanterar agentens session och samtalstillstånd mellan körningar.
  • Anpassar sitt beteende baserat på arbetsflödets körningsläge (direktuppspelning eller icke-direktuppspelning).
  • Ger utdatahändelser (AgentResponse eller AgentResponseUpdate) till arbetsflödesanroparen för observation.
  • Skickar meddelanden till anslutna underordnade utförare för fortsatt bearbetning i diagrammet.
  • Stöder kontrollpunkter för långvariga arbetsflöden.

Så här fungerar det

I C# skapar arbetsflödesmotorn internt en AIAgentHostExecutor för varje AIAgent som läggs till i ett arbetsflöde. Den här specialiserade utföraren utökar ChatProtocolExecutor och använder ett vände-token-mönster:

  1. Meddelandecaching - när meddelanden kommer från andra utförare samlar agentutföraren in dem. Om ForwardIncomingMessages är aktiverat (standard) vidarebefordras även inkommande meddelanden till underordnade utförare.
  2. Aktivera tokenutlösare – agenten bearbetar endast cachelagrade meddelanden efter att ha tagit emot en TurnToken.
  3. Agentanrop – exekutoranrop RunAsync (icke-direktuppspelning) eller RunStreamingAsync (direktuppspelning) på den underliggande agenten.
  4. Utdata generering – om strömmande händelser aktiveras ges varje inkrementell AgentResponseUpdate som ett arbetsflödesutdata. Om EmitAgentResponseEvents är aktiverat genereras även aggregerade AgentResponse som ett arbetsflödesutdata.
  5. Nedströmsmeddelanden – agentens svarsmeddelanden skickas till anslutna underordnade utförare.
  6. Aktivera genomströmning av token – när du har slutfört sin tur skickar kören en ny TurnToken nedströms så att nästa agent i kedjan kan börja bearbetas.

Tips/Råd

Vissa scenarier kan kräva en mer specialiserad agentexekutor. Till exempel använder överlämningsorkestreringar en dedikerad HandoffAgentExecutor med anpassad routningslogik.

Implicit kontra explicit skapande

När du skickar en AIAgent till WorkflowBuilder, omsluter ramverket automatiskt det i en AIAgentBinding, vilket skapar den underliggande AIAgentHostExecutor. Du behöver inte instansiera agentexekutor direkt.

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

Du kan också använda hjälpmetoderna på AgentWorkflowBuilder för vanliga mönster:

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

Anpassad konfiguration

Om du vill anpassa hur agentexekutor fungerar använder du BindAsExecutor med AIAgentHostOptions:

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

Indatatyper

Agentexekutor i C# accepterar flera indatatyper: string, ChatMessageoch IEnumerable<ChatMessage>. Strängindata konverteras automatiskt till ChatMessage instanser med User rollen. Alla inkommande meddelanden ackumuleras tills en TurnToken tas emot, varefter exekutorn bearbetar batchen. När ReassignOtherAgentsAsUsers är aktiverat (standard), tilldelas meddelanden från andra agenter till User-rollen så att den underliggande modellen behandlar dem som användarindata, medan meddelanden från den aktuella agenten behåller Assistant-rollen.

Utdata och länkning

När agenten har slutfört sin tur exekveraren:

  1. Skickar agentens svarsmeddelanden till alla anslutna underordnade utförare.
  2. Vidarebefordrar en ny TurnToken så att nästa agent i kedjan kan börja bearbeta den.

Detta gör länkningsagenter enkla – anslut dem helt enkelt med kanter:

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

Beteende för direktuppspelning

Strömningsbeteendet styrs av EmitAgentUpdateEvents alternativet på AIAgentHostOptions, eller dynamiskt via TurnToken:

  • När den är aktiverad anropar exekutorn RunStreamingAsync på agenten och returnerar varje AgentResponseUpdate som en händelse i arbetsflödets utdata. Detta ger realtidsuppdateringar för varje token.
  • När den är inaktiverad anropar utföraren RunAsync och genererar ett enda fullständigt svar.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

Delade sessioner

Varje agentexekutor underhåller sin egen session som standard. Om du vill dela en session mellan agenter konfigurerar du agenterna med en gemensam sessionsprovider innan du lägger till dem i arbetsflödet.

Konfigurationsalternativ

AIAgentHostOptions styr agentexekutorns beteende:

Option Standardinställning Beskrivning
EmitAgentUpdateEvents null Skicka ut strömmande uppdateringshändelser under utförandet. TurnToken har företräde om det anges. Om båda är nullinaktiveras direktuppspelningen.
EmitAgentResponseEvents false Generera det aggregerade agentsvaret som en utdatahändelse för arbetsflödet.
InterceptUserInputRequests false Fånga upp UserInputRequestContent och skicka det vidare som ett arbetsflödesmeddelande för hantering.
InterceptUnterminatedFunctionCalls false Fånga upp FunctionCallContent där det saknas ett motsvarande resultat och skicka som ett arbetsflödesmeddelande.
ReassignOtherAgentsAsUsers true Omtilldela meddelanden från andra agenter till User-rollen så att modellen behandlar dem som användarens indata.
ForwardIncomingMessages true Vidarebefordra inkommande meddelanden till underordnade utförare före agentens genererade meddelanden.

Kontrollpunkter

Agentexekutor har stöd för kontrollpunkter för långvariga arbetsflöden. När en kontrollpunkt tas serialiserar utföraren:

  • Agentens sessionsstatus (via SerializeSessionAsync).
  • Den aktuella svängens händelseutsläppskonfiguration (endast närvarande medan begäranden väntar och utföraren ännu inte har gett sin inkommande TurnToken).
  • Väntande användarinmatningsförfrågningar och begäranden om funktionsanrop.

Vid återställning deserialiserar exekveraren sessionen och förfrågningens pågående tillstånd, vilket gör att arbetsflödet kan återupptas där det avslutades.

Så här fungerar det

Klassen AgentExecutor omsluter en agent som implementerar SupportsAgentRun protokollet. När exekutorn tar emot ett meddelande:

  1. Meddelandenormalisering – indata normaliseras till en lista över Message objekt och läggs till i den interna cachen för den körbara filen. Kören accepterar flera indatatyper – str, Message, list[str | Message], AgentExecutorRequestoch AgentExecutorResponse – som varje dirigeras till en dedikerad hanterare som normaliserar indata före cachelagring.
  2. Agentanrop – kören anropar agent.run() med cachelagrade meddelanden och väljer automatiskt direktuppspelnings- eller icke-direktuppspelningsläge baserat på arbetsflödets körningsläge.
  3. Utdataemission – i strömningsläge genereras varje AgentResponseUpdate som en arbetsflödeshändelse. I icke-direktuppspelningsläge returneras en enda AgentResponse .
  4. Nedströmssändning – när agenten har slutfört, skickar exekutorn en AgentExecutorResponse till alla anslutna nedströms exekutorer. Det här svaret innehåller den fullständiga konversationshistoriken, vilket möjliggör sömlös länkning.
  5. Cacheåterställning – exekutörens interna meddelandecache rensas när agentens anrop görs, vilket säkerställer att varje anrop av agenten endast bearbetar nya meddelanden som tagits emot sedan det senaste anropet.

Tips/Råd

Vissa scenarier kan kräva en mer specialiserad agentutförare; till exempel använder överlämningsorkestreringar en dedicerad utförare med skräddarsydd logik för vidarebefordring.

Implicit kontra explicit skapande

WorkflowBuilder paketerar automatiskt agenten i AgentExecutor instanser när du direkt skickar en agent. För de flesta arbetsflöden räcker det med implicit skapande:

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

Explicit skapande

Skapa en AgentExecutor explicit när du behöver:

  • Dela en session mellan flera agenter.
  • Ange ett anpassat utförar-ID för routing och anpassade körningsparametrar.
  • Referera till samma exekveringsinstans i flera grafkanter.
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

Konstruktorparametrar:

Parameter Type Beskrivning
agent SupportsAgentRun Agenten som ska omslutas.
session AgentSession \| None Session som ska användas vid agentkörningar. Om None, skapas en ny session av agenten.
id str \| None Unikt utförar-ID. Standardinställningen är agentens namn om det är tillgängligt.

Tips/Råd

Kör-ID:t är också den nyckel som används när du riktar workflow.run(function_invocation_kwargs=...) eller client_kwargs= mot enskilda agenter. Om du utelämnar idanvänder arbetsflödet den omslutna agentens namn.

Indatatyper

Definierar AgentExecutor flera hanteringsmetoder, var och en accepterar en annan indatatyp. Arbetsflödesmotorn skickar automatiskt rätt hanterare baserat på meddelandetypen. Alla indatatyper utlöser att agenten körs omedelbart, förutom AgentExecutorRequest där should_respond flaggan styr om agenten körs eller bara cachelagrar meddelandena:

Indatatyp Hanterare Utlösaragent Beskrivning
AgentExecutorRequest run Villkorlig Den kanoniska indatatypen. Innehåller en lista över meddelanden och en should_respond flagga som styr om agenten körs.
str from_str Alltid Accepterar en raw-strängprompt.
Message from_message Alltid Accepterar ett enskilt Message objekt.
list[str \| Message] from_messages Alltid Accepterar en lista med strängar eller Message objekt som konversationskontext.
AgentExecutorResponse from_response Alltid Accepterar en tidigare agentexekutors svar, vilket möjliggör direkt länkning.

Användning av AgentExecutorRequest

AgentExecutorRequest är den kanoniska indatatypen och ger mest kontroll:

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message(role="user", contents=["Hello, world!"])],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

Flaggan should_respond styr om agenten bearbetar meddelandena omedelbart eller bara cachelagrar dem för senare:

  • True (standard) – agenten kör och genererar ett svar.
  • False — Meddelandena läggs till i cacheminnet men agenten körs inte. Detta är användbart för att förinläsa konversationskontexten innan du utlöser ett svar.

Utdata och länkning

När agenten är klar skickar utföraren en AgentExecutorResponse nedströms. Den här dataklassen innehåller:

Fält Type Beskrivning
executor_id str ID:t för den exekutor som genererade svaret.
agent_response AgentResponse Det underliggande agentsvaret (oförändrat från klienten).
full_conversation list[Message] \| None Den fullständiga konversationskontexten (tidigare indata + agentutdata) för länkning.

Vid kedjekoppling av agentutförare tar den underordnade utföraren emot AgentExecutorResponse via hanteraren from_response. Det använder fältet full_conversation för att bevara den fullständiga konversationshistoriken, vilket förhindrar att underordnade agenter förlorar tidigare kontext:

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

Beteende för direktuppspelning

AgentExecutor anpassas automatiskt till arbetsflödets körningsläge.

  • stream=True – anropar agent.run(stream=True) och lämnar var och en AgentResponseUpdate som en arbetsflödesevenemang. När streaming är klar aggregeras uppdateringarna till en fullständig AgentResponse för vidaresändning.
  • stream=False (standard) – anropar agent.run(stream=False) och ger en enda AgentResponse som en arbetsflödesutdatahändelse.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

Delade sessioner

Som standard skapar var och AgentExecutor en sin egen session. Om du vill dela en session mellan flera agenter (till exempel för att underhålla en gemensam konversationstråd) skapar du en session explicit och skickar den till varje köre:

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

Anmärkning

Alla agenter stöder inte delade sessioner. Vanligtvis kan endast agenter av samma providertyp dela en session.

Kontrollpunkter

AgentExecutor stöder kontrollpunkter för att spara och återställa tillstånd i långvariga arbetsflöden. När en kontrollpunkt tas serialiserar utföraren:

  • Den interna meddelandecachen.
  • Hela konversationshistoriken.
  • Agentens sessionstillstånd.
  • Pågående förfrågningar och svar för användarens inmatningar.

Vid återställning av systemet deserialiserar utföraren det här tillståndet så att arbetsflödet kan fortsätta från den aktuella platsen.

Varning

Kontrollpunkter med agenter som använder sessioner på serversidan (till exempel FoundryAgent) har begränsningar. Sessionstillståndet på serversidan samlas inte in i kontrollpunkter och kan ändras av efterföljande körningar. Överväg att implementera en anpassad exekutor om du behöver tillförlitlig kontrollpunktering med sessioner på serversidan.

Nästa steg