自訂代理程式

Microsoft Agent Framework 支援透過從類別 AIAgent 繼承並實作必要的方法來建立自訂代理。

本文說明如何建立一個簡單的自訂代理程式,並以大寫字母形式回應使用者的輸入。 在大多數情況下,建立自己的代理程式將涉及更複雜的邏輯以及與 AI 服務的整合。

使用者入門

將必要的 NuGet 套件新增至您的專案。

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

建立自訂代理人

代理人會議

要建立自訂代理,你還需要一個會話,用來追蹤單一對話的狀態,包括訊息歷史,以及代理需要維持的其他狀態。

為了方便上手,你可以繼承自各種實作共通會話儲存機制的基底類別。

  1. InMemoryAgentSession - 將聊天記錄儲存在記憶體中,並可序列化為 JSON。
  2. ServiceIdAgentSession - 不儲存聊天紀錄,但允許你將 ID 與會話關聯,並可將聊天紀錄儲存在外部。

在這個例子中,你會將 InMemoryAgentSession 用作自訂會話的基底類別。

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

Agent 類別

接著,透過從 AIAgent 類別繼承,建立代理程式類別本身。

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

建構工作階段

Sessions 總是透過代理程式類別上的兩個工廠方法建立。 這使代理可以控制會話的創建和反序列化過程。 因此,代理在建構時可以附加任何額外的狀態或行為給會話。

需要實作兩種方法:

    protected override ValueTask<AgentSession> CreateSessionCoreAsync(CancellationToken cancellationToken = default) 
        => new(new CustomAgentSession());

    protected override ValueTask<AgentSession> DeserializeSessionCoreAsync(JsonElement serializedState, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => new(new CustomAgentSession(serializedState, jsonSerializerOptions));

核心代理程式邏輯

代理的核心邏輯是接收任何輸入訊息,將其文字轉換為大寫,並以回應訊息的形式回傳。

新增以下方法來包含此邏輯。 輸入訊息會被複製,因為輸入訊息的各個面向必須修改才能成為有效的回應訊息。 例如,角色必須改為 Assistant

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

代理程式執行方法

最後,您需要實作兩個核心方法來執行代理程式:一種是非串流模式,另一種是串流模式。

兩種方法都需要確保有提供會話,如果沒有,就建立一個新的會話。 訊息可以在會話中被檢索,並傳遞給ChatHistoryProvider。 如果不這麼做,使用者將無法與代理進行多回合對話,每次執行都會是全新的互動。

    protected override async Task<AgentResponse> RunCoreAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    protected override async IAsyncEnumerable<AgentResponseUpdate> RunCoreStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

小提示

完整可執行範例請參閱 .NET 範例

Tools

自訂的 AIAgent 具有你決定賦予它的任何工具表面。 如果你包裝現有的 ,並傳入 ,就會繼承該客戶端的工具支援能力——例如,可參閱 OpenAIAzure OpenAIMicrosoft Foundry 的提供者頁面,了解底層客戶端支援哪些功能。 如果你的自訂代理沒有呼叫聊天客戶端(例如上面提到的 echo 代理),就沒有工具可以調用。

使用代理程式

如果方法 AIAgent 都正確實作,代理程式將是標準 AIAgent 並支援標準代理程式作業。

想了解更多如何執行及與代理互動的資訊,請參閱代理 入門教學

Microsoft Agent Framework 支援透過從類別 BaseAgent 繼承並實作必要的方法來建立自訂代理。

本文檔介紹如何構建一個簡單的自定義代理,以使用字首回顯使用者輸入。 在大多數情況下,建立自己的代理程式將涉及更複雜的邏輯以及與 AI 服務的整合。

使用者入門

將必要的 Python 套件新增至您的專案。

pip install agent-framework-core

建立自訂代理人

代理程式通訊協定

該框架提供了 SupportsAgentRun 定義所有代理必須實現的接口的協議。 自訂代理程式可以直接實作此通訊協定,也可以擴充 BaseAgent 類別以方便使用。

from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    Message,
    ResponseStream,
    SupportsAgentRun,
)

class MyCustomAgent(SupportsAgentRun):
    """A custom agent that implements the SupportsAgentRun directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent and return either an awaitable response or a ResponseStream."""
        ...

小提示

@overload簽名加入到run(),以使 IDE 和靜態型別檢查器根據streamAwaitable[AgentResponse]stream=FalseResponseStream[AgentResponseUpdate, AgentResponse]stream=True)來推斷回傳類型。

使用 BaseAgent

建議的方法是擴充 BaseAgent 類別,它提供了通用功能並簡化了實作:

import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    BaseAgent,
    Content,
    Message,
    ResponseStream,
    normalize_messages,
)


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent.

        Args:
            messages: The message(s) to process.
            stream: If True, return a ResponseStream of updates.
            session: The conversation session (optional).

        Returns:
            When stream=False: An awaitable AgentResponse.
            When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
        """
        if stream:
            return ResponseStream(
                self._run_stream(messages=messages, session=session, **kwargs),
                finalizer=AgentResponse.from_updates,
            )
        return self._run(messages=messages, session=session, **kwargs)

    async def _run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_message = Message(
                role="assistant",
                contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            last_message = normalized_messages[-1]
            echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
            response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])

        if session is not None:
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(response_message)

        return AgentResponse(messages=[response_message])

    async def _run_stream(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            last_message = normalized_messages[-1]
            response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"

        words = response_text.split()
        for i, word in enumerate(words):
            chunk_text = f" {word}" if i > 0 else word
            yield AgentResponseUpdate(
                contents=[Content.from_text(chunk_text)],
                role="assistant",
            )
            await asyncio.sleep(0.1)

        if session is not None:
            complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(complete_response)

Tools

自訂的 BaseAgent 會具有你決定賦予它的任何工具表面。 如果你包裝一個現有的聊天客戶端並傳遞 tools,你就會繼承該客戶端的工具支援——例如,請參考 OpenAIMicrosoft Foundry,或 Anthropic 的提供者頁面,了解底層客戶端所支援的內容。 如果你的自訂代理沒有呼叫聊天客戶端(例如上面提到的 echo 代理),就沒有工具可以調用。

使用代理程式

如果所有代理方法都正確實作,代理會支援標準操作,包括透過以下 ResponseStream方式進行串流:

stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
    print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()

想了解更多如何執行及與代理互動的資訊,請參閱代理 入門教學

後續步驟