Microsoft Agent Framework 支援透過從類別 AIAgent 繼承並實作必要的方法來建立自訂代理。
本文說明如何建立一個簡單的自訂代理程式,並以大寫字母形式回應使用者的輸入。 在大多數情況下,建立自己的代理程式將涉及更複雜的邏輯以及與 AI 服務的整合。
使用者入門
將必要的 NuGet 套件新增至您的專案。
dotnet add package Microsoft.Agents.AI.Abstractions --prerelease
建立自訂代理人
代理人會議
要建立自訂代理,你還需要一個會話,用來追蹤單一對話的狀態,包括訊息歷史,以及代理需要維持的其他狀態。
為了方便上手,你可以繼承自各種實作共通會話儲存機制的基底類別。
-
InMemoryAgentSession- 將聊天記錄儲存在記憶體中,並可序列化為 JSON。 -
ServiceIdAgentSession- 不儲存聊天紀錄,但允許你將 ID 與會話關聯,並可將聊天紀錄儲存在外部。
在這個例子中,你會將 InMemoryAgentSession 用作自訂會話的基底類別。
internal sealed class CustomAgentSession : InMemoryAgentSession
{
internal CustomAgentSession() : base() { }
internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
: base(serializedSessionState, jsonSerializerOptions) { }
}
Agent 類別
接著,透過從 AIAgent 類別繼承,建立代理程式類別本身。
internal sealed class UpperCaseParrotAgent : AIAgent
{
}
建構工作階段
Sessions 總是透過代理程式類別上的兩個工廠方法建立。 這使代理可以控制會話的創建和反序列化過程。 因此,代理在建構時可以附加任何額外的狀態或行為給會話。
需要實作兩種方法:
public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession());
public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));
核心代理程式邏輯
代理的核心邏輯是接收任何輸入訊息,將其文字轉換為大寫,並以回應訊息的形式回傳。
新增以下方法來包含此邏輯。
輸入訊息會被複製,因為輸入訊息的各個面向必須修改才能成為有效的回應訊息。 例如,角色必須改為 Assistant。
private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
{
var messageClone = x.Clone();
messageClone.Role = ChatRole.Assistant;
messageClone.MessageId = Guid.NewGuid().ToString();
messageClone.AuthorName = agentName;
messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
{
AdditionalProperties = tc.AdditionalProperties,
Annotations = tc.Annotations,
RawRepresentation = tc.RawRepresentation
} : c).ToList();
return messageClone;
});
代理程式執行方法
最後,您需要實作兩個核心方法來執行代理程式:一種是非串流模式,另一種是串流模式。
兩種方法都需要確保有提供會話,如果沒有,就建立一個新的會話。
訊息可以在會話中被檢索,並傳遞給ChatHistoryProvider。
如果不這麼做,使用者將無法與代理進行多回合對話,每次執行都會是全新的互動。
public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
return new AgentResponse
{
AgentId = this.Id,
ResponseId = Guid.NewGuid().ToString(),
Messages = responseMessages
};
}
public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
foreach (var message in responseMessages)
{
yield return new AgentResponseUpdate
{
AgentId = this.Id,
AuthorName = this.DisplayName,
Role = ChatRole.Assistant,
Contents = message.Contents,
ResponseId = Guid.NewGuid().ToString(),
MessageId = Guid.NewGuid().ToString()
};
}
}
小提示
完整可執行範例請參閱 .NET 範例 。
使用代理程式
如果方法 AIAgent 都正確實作,代理程式將是標準 AIAgent 並支援標準代理程式作業。
想了解更多如何執行及與代理互動的資訊,請參閱代理 入門教學。
Microsoft Agent Framework 支援透過從類別 BaseAgent 繼承並實作必要的方法來建立自訂代理。
本文檔介紹如何構建一個簡單的自定義代理,以使用字首回顯使用者輸入。 在大多數情況下,建立自己的代理程式將涉及更複雜的邏輯以及與 AI 服務的整合。
使用者入門
將必要的 Python 套件新增至您的專案。
pip install agent-framework-core --pre
建立自訂代理人
代理程式通訊協定
該框架提供了 SupportsAgentRun 定義所有代理必須實現的接口的協議。 自訂代理程式可以直接實作此通訊協定,也可以擴充 BaseAgent 類別以方便使用。
from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
Message,
ResponseStream,
SupportsAgentRun,
)
class MyCustomAgent(SupportsAgentRun):
"""A custom agent that implements the SupportsAgentRun directly."""
@property
def id(self) -> str:
"""Returns the ID of the agent."""
...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent and return either an awaitable response or a ResponseStream."""
...
小提示
將@overload簽名加入到run(),以使 IDE 和靜態型別檢查器根據stream(Awaitable[AgentResponse] 為stream=False 和 ResponseStream[AgentResponseUpdate, AgentResponse] 為stream=True)來推斷回傳類型。
使用 BaseAgent
建議的方法是擴充 BaseAgent 類別,它提供了通用功能並簡化了實作:
import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
Content,
Message,
ResponseStream,
normalize_messages,
)
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix."""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix,
**kwargs,
)
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent.
Args:
messages: The message(s) to process.
stream: If True, return a ResponseStream of updates.
session: The conversation session (optional).
Returns:
When stream=False: An awaitable AgentResponse.
When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
"""
if stream:
return ResponseStream(
self._run_stream(messages=messages, session=session, **kwargs),
finalizer=AgentResponse.from_updates,
)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = Message(
role="assistant",
contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
)
else:
last_message = normalized_messages[-1]
echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])
if session is not None:
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(response_message)
return AgentResponse(messages=[response_message])
async def _run_stream(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
last_message = normalized_messages[-1]
response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
words = response_text.split()
for i, word in enumerate(words):
chunk_text = f" {word}" if i > 0 else word
yield AgentResponseUpdate(
contents=[Content.from_text(chunk_text)],
role="assistant",
)
await asyncio.sleep(0.1)
if session is not None:
complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(complete_response)
使用代理程式
如果所有代理方法都正確實作,代理會支援標準操作,包括透過以下 ResponseStream方式進行串流:
stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()
想了解更多如何執行及與代理互動的資訊,請參閱代理 入門教學。