通过


代理执行程序

将 AI 代理添加到工作流时,需要将其包装在执行器中,以便工作流引擎可以将消息路由到工作流、管理其会话状态以及处理其输出。 代理执行程序是处理此适应的内置执行程序。

概述

代理执行程序弥合了代理抽象与工作流执行模型之间的差距。 该方法:

  • 从工作流图接收类型化消息,并将其转发到基础代理。
  • 管理代理在不同运行之间的会话和对话状态。
  • 根据工作流执行模式(流式处理或非流式处理)调整其行为。
  • 向工作流调用方生成输出事件(AgentResponseAgentResponseUpdate),供其观察。
  • 将消息发送到连接的下游执行程序,以便在图形中继续处理。
  • 支持为长时间运行的工作流创建检查点。

工作原理

在 C# 中,工作流引擎会在内部为添加到工作流的每个AIAgentHostExecutor创建一个AIAgent。 此专用执行程序扩展 ChatProtocolExecutor 并使用 轮次令牌 模式:

  1. 消息缓存 - 当消息从其他执行程序到达时,代理执行程序会收集它们。 如果 ForwardIncomingMessages 已启用(默认值),则传入消息也会转发到下游执行程序。
  2. 轮次令牌触发器 — 代理仅在收到 TurnToken令牌后处理其缓存的消息。
  3. 代理调用 — 执行程序对基础代理调用 RunAsync (非流式处理)或 RunStreamingAsync (流式处理)。
  4. 输出生成 - 如果启用了流式处理事件,则会生成每个增量 AgentResponseUpdate 作为工作流输出。 如果 EmitAgentResponseEvents 已启用,则聚合 AgentResponse 也会作为工作流输出生成。
  5. 下游消息传送 - 代理的响应消息将发送到连接的下游执行程序。
  6. 轮次令牌传递 - 完成轮次后,执行器发送新的TurnToken到下游,以便链中的下一个代理可以开始运算。

小窍门

某些方案可能需要更专用的代理执行程序;例如, 交接业务流程 使用专用 HandoffAgentExecutor 的自定义路由逻辑。

隐式创建与显式创建

传递AIAgentWorkflowBuilder时,框架会自动将其包装在AIAgentBinding中,从而创建底层的AIAgentHostExecutor。 无需直接实例化代理执行程序。

AIAgent writerAgent = /* create your agent */;
AIAgent reviewerAgent = /* create your agent */;

// Agents are automatically wrapped — no manual executor creation required
var workflow = new WorkflowBuilder(writerAgent)
    .AddEdge(writerAgent, reviewerAgent)
    .Build();

可以使用 AgentWorkflowBuilder 上的辅助方法来实现常见模式:

// Build a sequential pipeline of agents
var workflow = AgentWorkflowBuilder.BuildSequential(writerAgent, reviewerAgent);

自定义配置

若要自定义代理执行程序的行为方式,请使用 BindAsExecutorAIAgentHostOptions

var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
    EmitAgentResponseEvents = true,
    ReassignOtherAgentsAsUsers = true,
    ForwardIncomingMessages = true,
};

ExecutorBinding writerBinding = writerAgent.BindAsExecutor(options);
var workflow = new WorkflowBuilder(writerBinding)
    .AddEdge(writerBinding, reviewerAgent)
    .Build();

输入类型

C# 中的代理执行程序接受多个输入类型: stringChatMessageIEnumerable<ChatMessage>。 字符串输入会自动转换为具有 ChatMessage 角色的 User 实例。 所有传入的消息都会累积,直到收到TurnToken为止,此时执行器会处理消息批次。 启用时 ReassignOtherAgentsAsUsers (默认),其他代理的消息将重新分配给 User 角色,以便基础模型将它们视为用户输入,而来自当前代理的消息将保留该 Assistant 角色。

输出和链式调用

代理完成轮次后,执行者:

  1. 将代理的响应消息发送到所有连接的下游执行程序。
  2. 转发新的 TurnToken,以便链中的下一个代理可以开始处理。

这使得链接代理非常简单 , 只需使用边缘连接它们:

var workflow = new WorkflowBuilder(frenchTranslator)
    .AddEdge(frenchTranslator, spanishTranslator)
    .AddEdge(spanishTranslator, englishTranslator)
    .Build();

流式处理行为

流式处理行为由 EmitAgentUpdateEvents 选项在 AIAgentHostOptions 上控制,或通过 TurnToken 动态控制。

  • 启用后 , 执行程序对代理调用 RunStreamingAsync 并生成每个 AgentResponseUpdate 代理作为工作流输出事件。 这提供逐个令牌的实时更新。
  • 禁用时 , 执行程序调用 RunAsync 并生成单个完整响应。
// Enable streaming events at the configuration level
var options = new AIAgentHostOptions
{
    EmitAgentUpdateEvents = true,
};

// Or enable streaming dynamically via TurnToken
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

共享会话

默认情况下,每个代理执行程序维护自己的会话。 若要在代理之间共享会话,请在将代理添加到工作流之前使用通用会话提供程序配置代理。

配置选项

AIAgentHostOptions 控制代理执行程序的行为:

选项 默认 说明
EmitAgentUpdateEvents null 在执行期间发出流式更新事件。 TurnToken 如果已设置,则优先。 如果两者都是null,则禁用流式传输。
EmitAgentResponseEvents false 以工作流输出事件的形式发出聚合代理响应。
InterceptUserInputRequests false 截获 UserInputRequestContent 并将其作为工作流消息进行路由处理。
InterceptUnterminatedFunctionCalls false 不带相应结果的截获 FunctionCallContent 并将其路由为工作流消息。
ReassignOtherAgentsAsUsers true 将来自其他代理 User 的消息重新分配给角色,以便模型将其视为用户输入。
ForwardIncomingMessages true 在代理生成消息之前,将传入消息转发到下游执行器。

检查点

代理执行程序支持检查点功能,以便对长期运行的工作流进行处理。 当进行检查点时,执行器会进行序列化:

  • 代理的会话状态(通过 SerializeSessionAsync)。
  • 当前轮次的事件排放配置(仅在请求挂起且执行程序尚未生成其传入 TurnToken时存在)。
  • 任何挂起的用户输入请求和函数调用请求。

还原时,执行器将反序列化会话和挂起的请求状态,从而允许工作流从其离开的位置恢复。

工作原理

AgentExecutor 类封装了一个实现 SupportsAgentRun 协议的代理。 执行者收到消息时:

  1. 消息规范化 - 输入规范化为对象列表 Message ,并添加到执行程序的内部缓存中。 执行程序接受多个输入类型(strMessagelist[str | Message]AgentExecutorRequestAgentExecutorResponse),每个类型都路由到一个专用处理程序,该处理程序在缓存之前规范化输入。
  2. 代理调用 — 执行程序使用缓存的消息调用 agent.run() ,根据工作流执行模式自动选择流模式或非流模式。
  3. 输出排放 - 在流式处理模式下,每个 AgentResponseUpdate 输出都作为工作流输出事件生成。 在非流式处理模式下,会输出一个 AgentResponse
  4. 下游调度 - 代理完成后,执行程序会向所有连接的下游执行程序发送一个 AgentExecutorResponse 。 此响应包括完整的对话历史记录,可实现无缝链接。
  5. 缓存重置 - 执行程序的内部消息缓存在调用代理后清除,确保每个代理调用仅处理自上次调用以来收到的新消息。

小窍门

某些方案可能需要更专用的代理执行程序;例如, 交接业务流程 使用具有自定义路由逻辑的专用执行程序。

隐式创建与显式创建

直接传递代理时,WorkflowBuilder 会自动将代理包装在 AgentExecutor 实例中。 对于大多数工作流,隐式创建就足够了:

from agent_framework import WorkflowBuilder

writer_agent = client.as_agent(name="Writer", instructions="...")
reviewer_agent = client.as_agent(name="Reviewer", instructions="...")

# Agents are automatically wrapped — no manual AgentExecutor creation required
workflow = (
    WorkflowBuilder(start_executor=writer_agent)
    .add_edge(writer_agent, reviewer_agent)
    .build()
)

显式创建

在需要以下情况下显式创建:AgentExecutor

  • 在多个代理之间共享会话。
  • 为路由和目标运行时的 kwargs 提供自定义执行器 ID。
  • 引用多个边缘中的同一执行程序实例。
from agent_framework import AgentExecutor

writer_executor = AgentExecutor(writer_agent, id="my-writer")
reviewer_executor = AgentExecutor(reviewer_agent, id="my-reviewer")

workflow = (
    WorkflowBuilder(start_executor=writer_executor)
    .add_edge(writer_executor, reviewer_executor)
    .build()
)

构造函数参数:

参数 类型 说明
agent SupportsAgentRun 要包装的代理。
session AgentSession \| None 用于代理运行的会话。 如果 None,则由代理创建新会话。
id str \| None 唯一执行器 ID。 如果可用,则默认为代理的名称。

小窍门

执行程序 ID 也是当你将 workflow.run(function_invocation_kwargs=...)client_kwargs= 用于定位单个代理时使用的密钥。 如果省略 id,工作流将使用封装代理的名称。

输入类型

定义 AgentExecutor 多个处理程序方法,每个方法都接受不同的输入类型。 工作流引擎根据消息类型自动调度正确的处理程序。 除了AgentExecutorRequest之外,所有输入类型都会触发代理立即运行,其中should_respond标志控制代理是运行还是仅缓存消息。

输入类型 处理器 触发器代理 说明
AgentExecutorRequest run 有條件的 规范输入类型。 包含消息列表和一个 should_respond 标志,用于控制代理是否运行。
str from_str 始终 接受一个原始字符串提示。
Message from_message 始终 接受单个 Message 对象。
list[str \| Message] from_messages 始终 接受字符串或 Message 对象列表作为对话上下文。
AgentExecutorResponse from_response 始终 接受先前代理执行器的响应,实现直接链式调用。

使用 AgentExecutorRequest

AgentExecutorRequest 是规范输入类型,提供最多的控件:

from agent_framework import AgentExecutorRequest, Message

# Create a request with messages
request = AgentExecutorRequest(
    messages=[Message(role="user", contents=["Hello, world!"])],
    should_respond=True,
)

# Run the workflow
result = await workflow.run(request)

should_respond 标志控制代理是立即处理消息,还是只是缓存消息以供以后使用:

  • True (默认值) - 代理运行并生成响应。
  • False — 消息将添加到缓存,但代理未运行。 这对于在触发响应之前预加载会话上下文非常有用。

输出和链式调用

代理完成后,执行程序将发送 AgentExecutorResponse 到下游。 此数据类包含:

领域 类型 说明
executor_id str 生成响应的执行程序的 ID。
agent_response AgentResponse 原始代理响应(未被客户端更改)。
full_conversation list[Message] \| None 用于链式处理的完整会话上下文(先前的输入 + 代理输出)。

链式代理执行器时,下游执行器通过 AgentExecutorResponse 处理程序接收 from_response。 它使用 full_conversation 字段来保留完整的会话历史记录,防止下游代理丢失以前的上下文:

spam_detector = AgentExecutor(create_spam_detector_agent())
email_assistant = AgentExecutor(create_email_assistant_agent())

# The email_assistant receives the spam_detector's full conversation context
workflow = (
    WorkflowBuilder(start_executor=spam_detector)
    .add_edge(spam_detector, email_assistant)
    .build()
)

流式处理行为

AgentExecutor 自动适应工作流执行模式。

  • stream=True — 调用 agent.run(stream=True) 并生成每个 AgentResponseUpdate 事件作为工作流输出事件。 流式处理完成后,更新将聚合成完整的 AgentResponse 以供下游发送。
  • stream=False (默认值) - 调用 agent.run(stream=False) 并生成单个 AgentResponse 作为工作流输出事件。
# Streaming mode — receive incremental updates
events = workflow.run("Write a story about a cat.", stream=True)
async for event in events:
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        print(event.data.text, end="", flush=True)

# Non-streaming mode — receive complete response
result = await workflow.run("Write a story about a cat.")

# Retrieve AgentResponse objects from the result
outputs = result.get_outputs()
for output in outputs:
    if isinstance(output, AgentResponse):
        print(output.text)

共享会话

默认情况下,每个 AgentExecutor 会创建自己的会话。 若要在多个代理之间共享会话(例如,若要维护公共会话线程),请显式创建会话并将其传递给每个执行程序:

from agent_framework import AgentExecutor

# Create a shared session from one agent
shared_session = writer_agent.create_session()

# Both executors share the same session
writer_executor = AgentExecutor(writer_agent, session=shared_session)
reviewer_executor = AgentExecutor(reviewer_agent, session=shared_session)

注释

并非所有代理都支持共享会话。 通常,只有同一提供程序类型的代理可以共享会话。

检查点

在长时间运行的工作流中,AgentExecutor 支持检查点以保存和还原状态。 当进行检查点时,执行器会进行序列化:

  • 内部消息缓存。
  • 完整的对话历史记录。
  • 代理会话状态。
  • 任何挂起的用户输入请求和响应。

在还原时,执行程序会反序列化此状态,从而允许工作流从其离开位置恢复。

警告

使用服务器端会话(例如 FoundryAgent)的代理程序在进行检查点时存在限制。 服务器端会话状态未在检查点中捕获,可由后续运行修改。 如果需要使用服务器端会话进行可靠的检查点机制,请考虑实现自定义执行器。

后续步骤