Lưu ý
Cần có ủy quyền mới truy nhập được vào trang này. Bạn có thể thử đăng nhập hoặc thay đổi thư mục.
Cần có ủy quyền mới truy nhập được vào trang này. Bạn có thể thử thay đổi thư mục.
When you add an AI agent to a workflow, it needs to be wrapped in an executor so the workflow engine can route messages to it, manage its session state, and handle its output. The Agent Executor is the built-in executor that handles this adaptation.
Overview
The Agent Executor bridges the gap between the agent abstraction and the workflow execution model. It:
- Receives typed messages from the workflow graph and forwards them to the underlying agent.
- Manages the agent's session and conversation state between runs.
- Adapts its behavior based on the workflow execution mode (streaming or non-streaming).
- Yields output events (
AgentResponseorAgentResponseUpdate) to the workflow caller for observation. - Sends messages to connected downstream executors for continued processing within the graph.
- Supports checkpointing for long-running workflows.
How It Works
In C#, the workflow engine internally creates an AIAgentHostExecutor for each AIAgent added to a workflow. This specialized executor extends ChatProtocolExecutor and uses a turn token pattern:
- Message caching — as messages arrive from other executors, the agent executor collects them. If
ForwardIncomingMessagesis enabled (the default), the incoming messages are also forwarded to downstream executors. - Turn token trigger — the agent processes its cached messages only after receiving a
TurnToken. - Agent invocation — the executor calls
RunAsync(non-streaming) orRunStreamingAsync(streaming) on the underlying agent. - Output yielding — if streaming events are enabled, each incremental
AgentResponseUpdateis yielded as a workflow output. IfEmitAgentResponseEventsis enabled, the aggregatedAgentResponseis also yielded as a workflow output. - Downstream messaging — the agent's response messages are sent to connected downstream executors.
- Turn token pass-through — after completing its turn, the executor sends a new
TurnTokendownstream so that the next agent in the chain can begin processing.
Tip
Some scenarios may require a more specialized agent executor; for example, handoff orchestrations use a dedicated HandoffAgentExecutor with custom routing logic.
Implicit vs Explicit Creation
When you pass an AIAgent to WorkflowBuilder, the framework automatically wraps it in an AIAgentBinding, which creates the underlying AIAgentHostExecutor. You do not need to instantiate the agent executor directly.
AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;
// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
.AddEdge(writerAgent, reviewerAgent)
.Build();
You can also use the helper methods on AgentWorkflowBuilder for common patterns:
// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);
Custom Configuration
To customize how the agent executor behaves, use BindAsExecutor with AIAgentHostOptions:
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
EmitAgentResponseEvents = true,
ReassignOtherAgentsAsUsers = true,
ForwardIncomingMessages = true,
};
ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
.AddEdge(writerBinding, reviewerAgent)
.Build();
Input Types
The agent executor in C# accepts multiple input types: string, ChatMessage, and IEnumerable<ChatMessage>. String inputs are automatically converted to ChatMessage instances with the User role. All incoming messages are accumulated until a TurnToken is received, at which point the executor processes the batch. When ReassignOtherAgentsAsUsers is enabled (the default), messages from other agents are reassigned to the User role so the underlying model treats them as user inputs, while messages from the current agent retain the Assistant role.
Output and Chaining
After the agent completes its turn, the executor:
- Sends the agent's response messages to all connected downstream executors.
- Forwards a new
TurnTokenso the next agent in the chain can begin processing.
This makes chaining agents straightforward — simply connect them with edges:
var workflow = new WorkflowBuilder(frenchTranslator)
.AddEdge(frenchTranslator, spanishTranslator)
.AddEdge(spanishTranslator, englishTranslator)
.Build();
Streaming Behavior
Streaming behavior is controlled by the EmitAgentUpdateEvents option on AIAgentHostOptions, or dynamically via the TurnToken:
- When enabled — the executor calls
RunStreamingAsyncon the agent and yields eachAgentResponseUpdateas a workflow output event. This provides real-time token-by-token updates. - When disabled — the executor calls
RunAsyncand produces a single complete response.
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
EmitAgentUpdateEvents = true,
};
// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
Shared Sessions
Each agent executor maintains its own session by default. To share a session between agents, configure the agents with a common session provider before adding them to the workflow.
Configuration Options
AIAgentHostOptions controls the agent executor's behavior:
| Option | Default | Description |
|---|---|---|
EmitAgentUpdateEvents |
null |
Emit streaming update events during execution. TurnToken takes precedence if set. If both are null, streaming is disabled. |
EmitAgentResponseEvents |
false |
Emit the aggregated agent response as a workflow output event. |
InterceptUserInputRequests |
false |
Intercept UserInputRequestContent and route it as a workflow message for handling. |
InterceptUnterminatedFunctionCalls |
false |
Intercept FunctionCallContent without a corresponding result and route it as a workflow message. |
ReassignOtherAgentsAsUsers |
true |
Reassign messages from other agents to the User role so the model treats them as user inputs. |
ForwardIncomingMessages |
true |
Forward incoming messages to downstream executors before the agent's generated messages. |
Checkpointing
The agent executor supports checkpointing for long-running workflows. When a checkpoint is taken, the executor serializes:
- The agent's session state (via
SerializeSessionAsync). - The current turn's event emission configuration (only present while requests are pending and the executor has not yet yielded its incoming
TurnToken). - Any pending user input requests and function call requests.
On restore, the executor deserializes the session and pending request state, allowing the workflow to resume from where it left off.
How It Works
The AgentExecutor class wraps an agent that implements the SupportsAgentRun protocol. When the executor receives a message:
- Message normalization — the input is normalized into a list of
Messageobjects and added to the executor's internal cache. The executor accepts multiple input types —str,Message,list[str | Message],AgentExecutorRequest, andAgentExecutorResponse— each routed to a dedicated handler that normalizes the input before caching. - Agent invocation — the executor calls
agent.run()with the cached messages, automatically selecting streaming or non-streaming mode based on the workflow execution mode. - Output emission — in streaming mode, each
AgentResponseUpdateis yielded as a workflow output event. In non-streaming mode, a singleAgentResponseis yielded. - Downstream dispatch — after the agent completes, the executor sends an
AgentExecutorResponseto all connected downstream executors. This response includes the full conversation history, enabling seamless chaining. - Cache reset — the executor's internal message cache is cleared after the agent is invoked, ensuring that each agent invocation processes only new messages received since the last invocation.
Tip
Some scenarios may require a more specialized agent executor; for example, handoff orchestrations use a dedicated executor with custom routing logic.
Implicit vs Explicit Creation
The WorkflowBuilder automatically wraps agents in AgentExecutor instances when you pass an agent directly. For most workflows, implicit creation is sufficient:
from agent_framework import WorkflowBuilder
writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")
# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
WorkflowBuilder(start_executor=writer_agent)
.add_edge(writer_agent, reviewer_agent)
.build()
)
Explicit Creation
Create an AgentExecutor explicitly when you need to:
- Share a session between multiple agents.
- Provide a custom executor ID.
- Reference the same executor instance in multiple edges.
from agent_framework import AgentExecutor
writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")
workflow = (
WorkflowBuilder(start_executor=writer_executor)
.add_edge(writer_executor, reviewer_executor)
.build()
)
Constructor parameters:
| Parameter | Type | Description |
|---|---|---|
agent |
SupportsAgentRun |
The agent to wrap. |
session |
AgentSession \| None |
Session to use for agent runs. If None, a new session is created from the agent. |
id |
str \| None |
Unique executor ID. Defaults to the agent's name if available. |
Input Types
The AgentExecutor defines multiple handler methods, each accepting a different input type. The workflow engine automatically dispatches the correct handler based on the message type. All input types trigger the agent to run immediately, except for AgentExecutorRequest where the should_respond flag controls whether the agent runs or simply caches the messages:
| Input Type | Handler | Triggers Agent | Description |
|---|---|---|---|
AgentExecutorRequest |
run |
Conditional | The canonical input type. Contains a list of messages and a should_respond flag that controls whether the agent runs. |
str |
from_str |
Always | Accepts a raw string prompt. |
Message |
from_message |
Always | Accepts a single Message object. |
list[str \| Message] |
from_messages |
Always | Accepts a list of strings or Message objects as conversation context. |
AgentExecutorResponse |
from_response |
Always | Accepts a prior agent executor's response, enabling direct chaining. |
Using AgentExecutorRequest
AgentExecutorRequest is the canonical input type and provides the most control:
from agent_framework import AgentExecutorRequest, Message
# Create a request with messages
request = AgentExecutorRequest(
messages=[Message("user", text="Hello, world!")],
should_respond=True,
)
# Run the workflow
result = await workflow.run(request)
The should_respond flag controls whether the agent processes the messages immediately or simply caches them for later:
True(default) — the agent runs and produces a response.False— the messages are added to the cache but the agent does not run. This is useful for preloading conversation context before triggering a response.
Output and Chaining
After the agent completes, the executor sends an AgentExecutorResponse downstream. This dataclass contains:
| Field | Type | Description |
|---|---|---|
executor_id |
str |
The ID of the executor that produced the response. |
agent_response |
AgentResponse |
The underlying agent response (unaltered from the client). |
full_conversation |
list[Message] \| None |
The full conversation context (prior inputs + agent outputs) for chaining. |
When chaining agent executors, the downstream executor receives the AgentExecutorResponse via the from_response handler. It uses the full_conversation field to preserve the complete conversation history, preventing downstream agents from losing prior context:
spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())
# The email_assistant receives the spam_detector's full conversation context
workflow = (
WorkflowBuilder(start_executor=spam_detector)
.add_edge(spam_detector, email_assistant)
.build()
)
Streaming Behavior
The AgentExecutor automatically adapts to the workflow execution mode:
stream=True— callsagent.run(stream=True)and yields eachAgentResponseUpdateas a workflow output event. After streaming completes, the updates are aggregated into a fullAgentResponsefor downstream dispatch.stream=False(default) — callsagent.run(stream=False)and yields a singleAgentResponseas a workflow output event.
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
print(event.data.text, end="", flush=True)
# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")
# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
if isinstance(output, AgentResponse):
print(output.text)
Shared Sessions
By default, each AgentExecutor creates its own session. To share a session between multiple agents (for example, to maintain a common conversation thread), create a session explicitly and pass it to each executor:
from agent_framework import AgentExecutor
# Create a shared session from one agent
shared_session = writer_agent.create_session()
# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)
Note
Not all agents support shared sessions. Typically, only agents of the same provider type can share a session.
Checkpointing
The AgentExecutor supports checkpointing for saving and restoring state in long-running workflows. When a checkpoint is taken, the executor serializes:
- The internal message cache.
- The full conversation history.
- The agent session state.
- Any pending user input requests and responses.
On restore, the executor deserializes this state, allowing the workflow to resume from where it left off.
Warning
Checkpointing with agents that use server-side sessions (such as AzureAIAgentClient) has limitations. Server-side session state is not captured in checkpoints and can be modified by subsequent runs. Consider implementing a custom executor if you need reliable checkpointing with server-side sessions.