Microsoft Agent Framework では、 AIAgent クラスから継承し、必要なメソッドを実装することで、カスタム エージェントを構築できます。
この記事では、ユーザー入力を大文字で元に戻す単純なカスタム エージェントを構築する方法について説明します。 ほとんどの場合、独自のエージェントの構築には、より複雑なロジックと AI サービスとの統合が含まれます。
はじめに
必要な NuGet パッケージをプロジェクトに追加します。
dotnet add package Microsoft.Agents.AI.Abstractions --prerelease
カスタム エージェントを作成する
エージェント セッション
カスタム エージェントを作成するには、セッションも必要です。セッションは、メッセージ履歴やエージェントが保持する必要があるその他の状態など、単一の会話の状態を追跡するために使用されます。
簡単に開始できるように、共通のセッション ストレージ メカニズムを実装するさまざまな基底クラスから継承できます。
-
InMemoryAgentSession- チャット履歴をメモリに格納し、JSON にシリアル化できます。 -
ServiceIdAgentSession- チャット履歴は保存されませんが、チャット履歴を外部に保存できるセッションに ID を関連付けることができます。
この例では、カスタム セッションの基底クラスとして InMemoryAgentSession を使用します。
internal sealed class CustomAgentSession : InMemoryAgentSession
{
internal CustomAgentSession() : base() { }
internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
: base(serializedSessionState, jsonSerializerOptions) { }
}
Agent クラス
次に、 AIAgent クラスから継承して、エージェント クラス自体を作成します。
internal sealed class UpperCaseParrotAgent : AIAgent
{
}
セッションの構築方法
セッションは、常にエージェント クラスの 2 つのファクトリ メソッドを使用して作成されます。 これにより、エージェントはセッションの作成と逆シリアル化の方法を制御できます。 そのため、エージェントは、構築時にセッションに必要な追加の状態または動作をアタッチできます。
実装するには、次の 2 つのメソッドが必要です。
public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession());
public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));
コア エージェント ロジック
エージェントの主要なロジックは、入力メッセージを受け取り、テキストを大文字に変換し、応答メッセージとして返します。
このロジックを格納するには、次のメソッドを追加します。
入力メッセージのさまざまな側面を有効な応答メッセージに変更する必要があるため、入力メッセージは複製されます。 たとえば、ロールを Assistant に変更する必要があります。
private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
{
var messageClone = x.Clone();
messageClone.Role = ChatRole.Assistant;
messageClone.MessageId = Guid.NewGuid().ToString();
messageClone.AuthorName = agentName;
messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
{
AdditionalProperties = tc.AdditionalProperties,
Annotations = tc.Annotations,
RawRepresentation = tc.RawRepresentation
} : c).ToList();
return messageClone;
});
エージェントの実行メソッド
最後に、エージェントの実行に使用される 2 つのコア メソッド (非ストリーミング用とストリーミング用) を実装する必要があります。
どちらの方法でも、セッションが提供されていることを確認し、提供されていない場合は新しいセッションを作成する必要があります。
メッセージを取得し、セッション上の ChatHistoryProvider に渡すことができます。
これを行わないと、ユーザーはエージェントとの複数ターンの会話を行うことができず、各実行は新しいやり取りになります。
public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
return new AgentResponse
{
AgentId = this.Id,
ResponseId = Guid.NewGuid().ToString(),
Messages = responseMessages
};
}
public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
foreach (var message in responseMessages)
{
yield return new AgentResponseUpdate
{
AgentId = this.Id,
AuthorName = this.DisplayName,
Role = ChatRole.Assistant,
Contents = message.Contents,
ResponseId = Guid.NewGuid().ToString(),
MessageId = Guid.NewGuid().ToString()
};
}
}
ヒント
実行可能な完全な例については、 .NET サンプル を参照してください。
エージェントの使用
AIAgentメソッドがすべて正しく実装されている場合、エージェントは標準のAIAgentとなり、標準のエージェント操作をサポートします。
エージェントを実行して操作する方法の詳細については、 エージェントの概要に関するチュートリアルを参照してください。
Microsoft Agent Framework では、 BaseAgent クラスから継承し、必要なメソッドを実装することで、カスタム エージェントを構築できます。
このドキュメントでは、プレフィックスを使用してユーザー入力をエコー バックする単純なカスタム エージェントを構築する方法について説明します。 ほとんどの場合、独自のエージェントの構築には、より複雑なロジックと AI サービスとの統合が含まれます。
はじめに
必要な Python パッケージをプロジェクトに追加します。
pip install agent-framework-core --pre
カスタム エージェントを作成する
エージェント プロトコル
フレームワークには、すべてのエージェントが実装する必要があるインターフェイスを定義する SupportsAgentRun プロトコルが用意されています。 カスタム エージェントは、このプロトコルを直接実装することも、便宜上 BaseAgent クラスを拡張することもできます。
from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
Message,
ResponseStream,
SupportsAgentRun,
)
class MyCustomAgent(SupportsAgentRun):
"""A custom agent that implements the SupportsAgentRun directly."""
@property
def id(self) -> str:
"""Returns the ID of the agent."""
...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent and return either an awaitable response or a ResponseStream."""
...
ヒント
@overloadシグネチャをrun()に追加して、IDE と静的型チェッカーが@overloadに基づいて戻り値の型を推論するようにします (Awaitable[AgentResponse]の場合はstream=False、ResponseStream[AgentResponseUpdate, AgentResponse]の場合はstream=True)。
BaseAgent の使用
推奨される方法は、共通の機能を提供し、実装を簡略化する BaseAgent クラスを拡張することです。
import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
Content,
Message,
ResponseStream,
normalize_messages,
)
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix."""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix,
**kwargs,
)
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent.
Args:
messages: The message(s) to process.
stream: If True, return a ResponseStream of updates.
session: The conversation session (optional).
Returns:
When stream=False: An awaitable AgentResponse.
When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
"""
if stream:
return ResponseStream(
self._run_stream(messages=messages, session=session, **kwargs),
finalizer=AgentResponse.from_updates,
)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = Message(
role="assistant",
contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
)
else:
last_message = normalized_messages[-1]
echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])
if session is not None:
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(response_message)
return AgentResponse(messages=[response_message])
async def _run_stream(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
last_message = normalized_messages[-1]
response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
words = response_text.split()
for i, word in enumerate(words):
chunk_text = f" {word}" if i > 0 else word
yield AgentResponseUpdate(
contents=[Content.from_text(chunk_text)],
role="assistant",
)
await asyncio.sleep(0.1)
if session is not None:
complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(complete_response)
エージェントの使用
エージェント メソッドがすべて正しく実装されている場合、エージェントは、 ResponseStream経由のストリーミングを含む標準操作をサポートします。
stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()
エージェントを実行して操作する方法の詳細については、 エージェントの概要に関するチュートリアルを参照してください。