自定义代理

Microsoft Agent Framework 支持通过从 AIAgent 类继承并实现所需方法来构建自定义代理。

本文介绍如何构建一个简单的自定义代理,将用户输入转换为大写后重复输出。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。

入门

将所需的 NuGet 包添加到项目。

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

创建自定义代理

代理会话

若要创建自定义代理,还需要一个会话,该会话用于跟踪单个会话的状态,包括消息历史记录,以及代理需要维护的任何其他状态。

为了便于入门,可以从实现常见会话存储机制的各种基类继承。

  1. InMemoryAgentSession - 将聊天历史记录存储在内存中,并且可以序列化为 JSON。
  2. ServiceIdAgentSession - 不存储任何聊天历史记录,但允许您将 ID 与会话相关联,可在其中外部存储聊天历史记录。

对于此示例,你将使用 InMemoryAgentSession 自定义会话的基类。

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

Agent 类

接下来,通过从 AIAgent 类继承来创建代理类本身。

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

构建用户会话

会话始终通过代理类上的两个工厂方法创建。 这允许代理控制会话的创建和反序列化方式。 因此,代理可以在构造时将任何其他状态或行为附加到会话。

需要实现两种方法:

    protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default) 
        => new(new CustomAgentSession());

    protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => new(new CustomAgentSession(serializedState, jsonSerializerOptions));

核心代理逻辑

代理的核心逻辑是获取任何输入消息,将其文本转换为大写,并将其作为响应消息返回。

添加以下方法以包含此逻辑。 输入消息将被克隆,以便将输入消息的各个方面修改为有效的应答消息。 例如,角色必须更改为 Assistant

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

代理运行方法

最后,需要实现用于运行代理的两种核心方法:一个用于非流式处理,一个用于流式处理。

对于这两种方法,需要确保提供会话,如果没有,请创建新会话。 可以检索并将消息传递给会话中的 ChatHistoryProvider。 如果不执行此作,用户将无法与代理进行多轮次对话,并且每次运行都将是一个新的交互。

    protected override async Task<AgentResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

小窍门

有关完整的可运行示例,请参阅 .NET 示例

工具

自定义 AIAgent 将具有你决定赋予它的任何工具界面。 如果你对现有的 IChatClient 进行封装,并传入 tools,就会继承该客户端的工具支持——例如,可参阅 OpenAIAzure OpenAIMicrosoft Foundry 的提供程序页面,了解底层客户端支持哪些功能。 如果自定义代理不调用聊天客户端(例如上述回显代理),则没有可调用的工具。

使用代理

AIAgent如果方法都正确实现,则代理是标准AIAgent代理并支持标准代理操作。

有关如何运行和与代理交互的详细信息,请参阅 代理入门教程

Microsoft Agent Framework 支持通过从 BaseAgent 类继承并实现所需方法来构建自定义代理。

本文档演示如何构建一个简单的自定义代理,该代理会带前缀地回显用户输入。 在大多数情况下,构建自己的代理将涉及更复杂的逻辑和与 AI 服务的集成。

入门

将所需的 Python 包添加到项目。

pip install agent-framework-core

创建自定义代理

代理协议

框架提供的协议 SupportsAgentRun 定义了所有代理必须实现的接口。 自定义代理程序可以直接实现此协议或为方便起见扩展 BaseAgent 类。

from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    Message,
    ResponseStream,
    SupportsAgentRun,
)

class MyCustomAgent(SupportsAgentRun):
    """A custom agent that implements the SupportsAgentRun directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent and return either an awaitable response or a ResponseStream."""
        ...

小窍门

添加 @overload 签名到 run(),以便 IDE 和静态类型检查器可以根据 stream 推断返回类型(Awaitable[AgentResponse] 适用于 stream=FalseResponseStream[AgentResponseUpdate, AgentResponse] 适用于 stream=True)。

使用 BaseAgent

建议的方法是扩展 BaseAgent 类,该类提供常见功能和简化实现:

import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    BaseAgent,
    Content,
    Message,
    ResponseStream,
    normalize_messages,
)


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent.

        Args:
            messages: The message(s) to process.
            stream: If True, return a ResponseStream of updates.
            session: The conversation session (optional).

        Returns:
            When stream=False: An awaitable AgentResponse.
            When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
        """
        if stream:
            return ResponseStream(
                self._run_stream(messages=messages, session=session, **kwargs),
                finalizer=AgentResponse.from_updates,
            )
        return self._run(messages=messages, session=session, **kwargs)

    async def _run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_message = Message(
                role="assistant",
                contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            last_message = normalized_messages[-1]
            echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
            response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])

        if session is not None:
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(response_message)

        return AgentResponse(messages=[response_message])

    async def _run_stream(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            last_message = normalized_messages[-1]
            response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"

        words = response_text.split()
        for i, word in enumerate(words):
            chunk_text = f" {word}" if i > 0 else word
            yield AgentResponseUpdate(
                contents=[Content.from_text(chunk_text)],
                role="assistant",
            )
            await asyncio.sleep(0.1)

        if session is not None:
            complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(complete_response)

工具

自定义 BaseAgent 可拥有你为其指定的任意工具界面。 如果你封装了一个现有的聊天客户端,并将 tools 传递下去,你将继承该客户端的工具支持——例如,可参阅 OpenAIMicrosoft FoundryAnthropic 的提供方页面,了解底层客户端支持哪些工具。 如果自定义代理不调用聊天客户端(例如上述回显代理),则没有可调用的工具。

使用代理

如果所有代理方法都正确实现,代理支持标准操作,包括通过以下方式 ResponseStream进行流式处理:

stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
    print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()

有关如何运行和与代理交互的详细信息,请参阅 代理入门教程

后续步骤