Microsoft Agent Framework 支持通过从 AIAgent 类继承并实现所需方法来构建自定义代理。
本文介绍如何构建一个简单的自定义代理,将用户输入转换为大写后重复输出。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。
入门
将所需的 NuGet 包添加到项目。
dotnet add package Microsoft.Agents.AI.Abstractions --prerelease
创建自定义代理
代理会话
若要创建自定义代理,还需要一个会话,该会话用于跟踪单个会话的状态,包括消息历史记录,以及代理需要维护的任何其他状态。
为了便于入门,可以从实现常见会话存储机制的各种基类继承。
-
InMemoryAgentSession- 将聊天历史记录存储在内存中,并且可以序列化为 JSON。 -
ServiceIdAgentSession- 不存储任何聊天历史记录,但允许您将 ID 与会话相关联,可在其中外部存储聊天历史记录。
对于此示例,你将使用 InMemoryAgentSession 自定义会话的基类。
internal sealed class CustomAgentSession : InMemoryAgentSession
{
internal CustomAgentSession() : base() { }
internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
: base(serializedSessionState, jsonSerializerOptions) { }
}
Agent 类
接下来,通过从 AIAgent 类继承来创建代理类本身。
internal sealed class UpperCaseParrotAgent : AIAgent
{
}
构建用户会话
会话始终通过代理类上的两个工厂方法创建。 这允许代理控制会话的创建和反序列化方式。 因此,代理可以在构造时将任何其他状态或行为附加到会话。
需要实现两种方法:
public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession());
public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));
核心代理逻辑
代理的核心逻辑是获取任何输入消息,将其文本转换为大写,并将其作为响应消息返回。
添加以下方法以包含此逻辑。
输入消息将被克隆,以便将输入消息的各个方面修改为有效的应答消息。 例如,角色必须更改为 Assistant。
private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
{
var messageClone = x.Clone();
messageClone.Role = ChatRole.Assistant;
messageClone.MessageId = Guid.NewGuid().ToString();
messageClone.AuthorName = agentName;
messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
{
AdditionalProperties = tc.AdditionalProperties,
Annotations = tc.Annotations,
RawRepresentation = tc.RawRepresentation
} : c).ToList();
return messageClone;
});
代理运行方法
最后,需要实现用于运行代理的两种核心方法:一个用于非流式处理,一个用于流式处理。
对于这两种方法,需要确保提供会话,如果没有,请创建新会话。
可以检索并将消息传递给会话中的 ChatHistoryProvider。
如果不执行此作,用户将无法与代理进行多轮次对话,并且每次运行都将是一个新的交互。
public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
return new AgentResponse
{
AgentId = this.Id,
ResponseId = Guid.NewGuid().ToString(),
Messages = responseMessages
};
}
public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
foreach (var message in responseMessages)
{
yield return new AgentResponseUpdate
{
AgentId = this.Id,
AuthorName = this.DisplayName,
Role = ChatRole.Assistant,
Contents = message.Contents,
ResponseId = Guid.NewGuid().ToString(),
MessageId = Guid.NewGuid().ToString()
};
}
}
小窍门
有关完整的可运行示例,请参阅 .NET 示例 。
使用代理
AIAgent如果方法都正确实现,则代理是标准AIAgent代理并支持标准代理操作。
有关如何运行和与代理交互的详细信息,请参阅 代理入门教程。
Microsoft Agent Framework 支持通过从 BaseAgent 类继承并实现所需方法来构建自定义代理。
本文档演示如何构建一个简单的自定义代理,该代理会带前缀地回显用户输入。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。
入门
将所需的 Python 包添加到项目。
pip install agent-framework-core --pre
创建自定义代理
代理协议
框架提供的协议 SupportsAgentRun 定义了所有代理必须实现的接口。 自定义代理程序可以直接实现此协议或为方便起见扩展 BaseAgent 类。
from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
Message,
ResponseStream,
SupportsAgentRun,
)
class MyCustomAgent(SupportsAgentRun):
"""A custom agent that implements the SupportsAgentRun directly."""
@property
def id(self) -> str:
"""Returns the ID of the agent."""
...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent and return either an awaitable response or a ResponseStream."""
...
小窍门
添加 @overload 签名到 run(),以便 IDE 和静态类型检查器可以根据 stream 推断返回类型(Awaitable[AgentResponse] 适用于 stream=False,ResponseStream[AgentResponseUpdate, AgentResponse] 适用于 stream=True)。
使用 BaseAgent
建议的方法是扩展 BaseAgent 类,该类提供常见功能和简化实现:
import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
Content,
Message,
ResponseStream,
normalize_messages,
)
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix."""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix,
**kwargs,
)
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent.
Args:
messages: The message(s) to process.
stream: If True, return a ResponseStream of updates.
session: The conversation session (optional).
Returns:
When stream=False: An awaitable AgentResponse.
When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
"""
if stream:
return ResponseStream(
self._run_stream(messages=messages, session=session, **kwargs),
finalizer=AgentResponse.from_updates,
)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = Message(
role="assistant",
contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
)
else:
last_message = normalized_messages[-1]
echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])
if session is not None:
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(response_message)
return AgentResponse(messages=[response_message])
async def _run_stream(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
last_message = normalized_messages[-1]
response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
words = response_text.split()
for i, word in enumerate(words):
chunk_text = f" {word}" if i > 0 else word
yield AgentResponseUpdate(
contents=[Content.from_text(chunk_text)],
role="assistant",
)
await asyncio.sleep(0.1)
if session is not None:
complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(complete_response)
使用代理
如果所有代理方法都正确实现,代理支持标准操作,包括通过以下方式 ResponseStream进行流式处理:
stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()
有关如何运行和与代理交互的详细信息,请参阅 代理入门教程。