共用方式為


自訂代理程式

Microsoft Agent Framework 支援透過從類別 AIAgent 繼承並實作必要的方法來建立自訂代理。

本文說明如何建立一個簡單的自訂代理程式,並以大寫字母形式回應使用者的輸入。 在大多數情況下,建立自己的代理程式將涉及更複雜的邏輯以及與 AI 服務的整合。

使用者入門

將必要的 NuGet 套件新增至您的專案。

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

建立自訂代理人

代理人會議

要建立自訂代理,你還需要一個會話,用來追蹤單一對話的狀態,包括訊息歷史,以及代理需要維持的其他狀態。

為了方便上手,你可以繼承自各種實作共通會話儲存機制的基底類別。

  1. InMemoryAgentSession - 將聊天記錄儲存在記憶體中,並可序列化為 JSON。
  2. ServiceIdAgentSession - 不儲存聊天紀錄,但允許你將 ID 與會話關聯,並可將聊天紀錄儲存在外部。

在這個例子中,你會將 InMemoryAgentSession 用作自訂會話的基底類別。

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

Agent 類別

接著,透過從 AIAgent 類別繼承,建立代理程式類別本身。

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

建構工作階段

Sessions 總是透過代理程式類別上的兩個工廠方法建立。 這使代理可以控制會話的創建和反序列化過程。 因此,代理在建構時可以附加任何額外的狀態或行為給會話。

需要實作兩種方法:

    public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default) 
        => Task.FromResult<AgentSession>(new CustomAgentSession());

    public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));

核心代理程式邏輯

代理的核心邏輯是接收任何輸入訊息,將其文字轉換為大寫,並以回應訊息的形式回傳。

新增以下方法來包含此邏輯。 輸入訊息會被複製,因為輸入訊息的各個面向必須修改才能成為有效的回應訊息。 例如,角色必須改為 Assistant

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

代理程式執行方法

最後,您需要實作兩個核心方法來執行代理程式:一種是非串流模式,另一種是串流模式。

兩種方法都需要確保有提供會話,如果沒有,就建立一個新的會話。 訊息可以在會話中被檢索,並傳遞給ChatHistoryProvider。 如果不這麼做,使用者將無法與代理進行多回合對話,每次執行都會是全新的互動。

    public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

小提示

完整可執行範例請參閱 .NET 範例

使用代理程式

如果方法 AIAgent 都正確實作,代理程式將是標準 AIAgent 並支援標準代理程式作業。

想了解更多如何執行及與代理互動的資訊,請參閱代理 入門教學

Microsoft Agent Framework 支援透過從類別 BaseAgent 繼承並實作必要的方法來建立自訂代理。

本文檔介紹如何構建一個簡單的自定義代理,以使用字首回顯使用者輸入。 在大多數情況下,建立自己的代理程式將涉及更複雜的邏輯以及與 AI 服務的整合。

使用者入門

將必要的 Python 套件新增至您的專案。

pip install agent-framework-core --pre

建立自訂代理人

代理程式通訊協定

該框架提供了 SupportsAgentRun 定義所有代理必須實現的接口的協議。 自訂代理程式可以直接實作此通訊協定,也可以擴充 BaseAgent 類別以方便使用。

from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    Message,
    ResponseStream,
    SupportsAgentRun,
)

class MyCustomAgent(SupportsAgentRun):
    """A custom agent that implements the SupportsAgentRun directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent and return either an awaitable response or a ResponseStream."""
        ...

小提示

@overload簽名加入到run(),以使 IDE 和靜態型別檢查器根據streamAwaitable[AgentResponse]stream=FalseResponseStream[AgentResponseUpdate, AgentResponse]stream=True)來推斷回傳類型。

使用 BaseAgent

建議的方法是擴充 BaseAgent 類別,它提供了通用功能並簡化了實作:

import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    BaseAgent,
    Content,
    Message,
    ResponseStream,
    normalize_messages,
)


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent.

        Args:
            messages: The message(s) to process.
            stream: If True, return a ResponseStream of updates.
            session: The conversation session (optional).

        Returns:
            When stream=False: An awaitable AgentResponse.
            When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
        """
        if stream:
            return ResponseStream(
                self._run_stream(messages=messages, session=session, **kwargs),
                finalizer=AgentResponse.from_updates,
            )
        return self._run(messages=messages, session=session, **kwargs)

    async def _run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_message = Message(
                role="assistant",
                contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            last_message = normalized_messages[-1]
            echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
            response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])

        if session is not None:
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(response_message)

        return AgentResponse(messages=[response_message])

    async def _run_stream(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            last_message = normalized_messages[-1]
            response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"

        words = response_text.split()
        for i, word in enumerate(words):
            chunk_text = f" {word}" if i > 0 else word
            yield AgentResponseUpdate(
                contents=[Content.from_text(chunk_text)],
                role="assistant",
            )
            await asyncio.sleep(0.1)

        if session is not None:
            complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(complete_response)

使用代理程式

如果所有代理方法都正確實作,代理會支援標準操作,包括透過以下 ResponseStream方式進行串流:

stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
    print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()

想了解更多如何執行及與代理互動的資訊,請參閱代理 入門教學

後續步驟