Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Microsoft Agent Framework поддерживает создание пользовательских агентов, наследуя от AIAgent класса и реализуя необходимые методы.
В этой статье объясняется, как создать простой настраиваемый агент, который повторяет ввод пользователя в верхнем регистре. В большинстве случаев создание собственного агента будет включать более сложную логику и интеграцию со службой ИИ.
Начало работы
Добавьте необходимые пакеты NuGet в проект.
dotnet add package Microsoft.Agents.AI.Abstractions --prerelease
Создание пользовательского агента
Сеанс агента
Для создания пользовательского агента также требуется сеанс, который используется для отслеживания состояния отдельной беседы, включая журнал сообщений и любого другого состояния, которое агент должен поддерживать.
Чтобы облегчить начало работы, можно наследовать от различных базовых классов, реализующих общие механизмы хранения сессий.
-
InMemoryAgentSession— сохраняет журнал чата в памяти и может быть сериализован в JSON. -
ServiceIdAgentSession— не хранит журнал чата, но позволяет связать идентификатор с сеансом, в котором журнал чата может храниться внешне.
В этом примере вы будете использовать InMemoryAgentSession в качестве базового класса для пользовательского сеанса.
internal sealed class CustomAgentSession : InMemoryAgentSession
{
internal CustomAgentSession() : base() { }
internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
: base(serializedSessionState, jsonSerializerOptions) { }
}
Класс Agent
Затем создайте сам класс агента, наследуя от AIAgent класса.
internal sealed class UpperCaseParrotAgent : AIAgent
{
}
Создание сеансов
Сеансы всегда создаются через два фабричных метода из класса агента. Это позволяет агенту управлять созданием и десериализацией сеансов. Поэтому агенты могут добавлять любое дополнительное состояние или поведение, необходимое для сессии при её создании.
Необходимо реализовать два метода:
public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession());
public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
=> Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));
Логика основного агента
Основная логика агента заключается в том, чтобы принимать все входные сообщения, преобразовывать их текст в верхний регистр и возвращать их в виде сообщений ответа.
Добавьте следующий метод, чтобы реализовать эту логику.
Входные сообщения клонируются, так как для допустимых ответных сообщений необходимо изменить различные аспекты входных сообщений. Например, роль должна быть изменена на Assistant.
private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
{
var messageClone = x.Clone();
messageClone.Role = ChatRole.Assistant;
messageClone.MessageId = Guid.NewGuid().ToString();
messageClone.AuthorName = agentName;
messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
{
AdditionalProperties = tc.AdditionalProperties,
Annotations = tc.Annotations,
RawRepresentation = tc.RawRepresentation
} : c).ToList();
return messageClone;
});
Методы запуска агента
Наконец, необходимо реализовать два основных метода, которые используются для запуска агента: один для непотоковой передачи и один для потоковой передачи.
Для обоих методов необходимо убедиться, что предоставлен сеанс, а если нет, создайте новый сеанс.
Сообщения можно получить и передать ChatHistoryProvider в рамках сеанса.
Если этого не сделать, пользователь не сможет вести диалог с несколькими ходами с агентом, и каждый сеанс будет новым взаимодействием.
public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
return new AgentResponse
{
AgentId = this.Id,
ResponseId = Guid.NewGuid().ToString(),
Messages = responseMessages
};
}
public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
session ??= await this.CreateSessionAsync(cancellationToken);
// Get existing messages from the store
var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
// Notify the session of the input and output messages.
var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
{
ResponseMessages = responseMessages
};
await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);
foreach (var message in responseMessages)
{
yield return new AgentResponseUpdate
{
AgentId = this.Id,
AuthorName = this.DisplayName,
Role = ChatRole.Assistant,
Contents = message.Contents,
ResponseId = Guid.NewGuid().ToString(),
MessageId = Guid.NewGuid().ToString()
};
}
}
Подсказка
Полные примеры запуска см. в примерах .NET .
Использование агента
AIAgent Если все методы реализованы правильно, агент будет стандартным AIAgent и поддерживает стандартные операции агента.
Дополнительные сведения о запуске и взаимодействии с агентами см. в руководствах по началу работы агента.
Microsoft Agent Framework поддерживает создание пользовательских агентов, наследуя от BaseAgent класса и реализуя необходимые методы.
В этом документе показано, как создать простой пользовательский агент, который возвращает введенные пользователем данные с префиксом. В большинстве случаев создание собственного агента будет включать более сложную логику и интеграцию со службой ИИ.
Начало работы
Добавьте необходимые пакеты Python в проект.
pip install agent-framework-core --pre
Создание пользовательского агента
Протокол агента
Платформа предоставляет протокол, определяющий SupportsAgentRun интерфейс, который должен реализовать все агенты. Пользовательские агенты могут напрямую реализовать этот протокол или расширить BaseAgent класс для большего удобства.
from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
Message,
ResponseStream,
SupportsAgentRun,
)
class MyCustomAgent(SupportsAgentRun):
"""A custom agent that implements the SupportsAgentRun directly."""
@property
def id(self) -> str:
"""Returns the ID of the agent."""
...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent and return either an awaitable response or a ResponseStream."""
...
Подсказка
Добавьте @overload подписи в run(), чтобы среды разработки и статические анализаторы типов выводили возвращаемый тип на основе stream (Awaitable[AgentResponse] для stream=False и ResponseStream[AgentResponseUpdate, AgentResponse] для stream=True).
Использование BaseAgent
Рекомендуемый BaseAgent подход — расширить класс, предоставляющий общие функциональные возможности и упрощающий реализацию:
import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
Content,
Message,
ResponseStream,
normalize_messages,
)
class EchoAgent(BaseAgent):
"""A simple custom agent that echoes user messages with a prefix."""
echo_prefix: str = "Echo: "
def __init__(
self,
*,
name: str | None = None,
description: str | None = None,
echo_prefix: str = "Echo: ",
**kwargs: Any,
) -> None:
super().__init__(
name=name,
description=description,
echo_prefix=echo_prefix,
**kwargs,
)
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[False] = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse]: ...
@overload
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: Literal[True],
session: AgentSession | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...
def run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
"""Execute the agent.
Args:
messages: The message(s) to process.
stream: If True, return a ResponseStream of updates.
session: The conversation session (optional).
Returns:
When stream=False: An awaitable AgentResponse.
When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
"""
if stream:
return ResponseStream(
self._run_stream(messages=messages, session=session, **kwargs),
finalizer=AgentResponse.from_updates,
)
return self._run(messages=messages, session=session, **kwargs)
async def _run(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AgentResponse:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_message = Message(
role="assistant",
contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
)
else:
last_message = normalized_messages[-1]
echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])
if session is not None:
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(response_message)
return AgentResponse(messages=[response_message])
async def _run_stream(
self,
messages: str | Message | Sequence[str | Message] | None = None,
*,
session: AgentSession | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
normalized_messages = normalize_messages(messages)
if not normalized_messages:
response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
else:
last_message = normalized_messages[-1]
response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
words = response_text.split()
for i, word in enumerate(words):
chunk_text = f" {word}" if i > 0 else word
yield AgentResponseUpdate(
contents=[Content.from_text(chunk_text)],
role="assistant",
)
await asyncio.sleep(0.1)
if session is not None:
complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
stored = session.state.setdefault("memory", {}).setdefault("messages", [])
stored.extend(normalized_messages)
stored.append(complete_response)
Использование агента
Если все методы агента реализованы правильно, агент поддерживает стандартные операции, включая потоковую передачу через ResponseStream:
stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()
Дополнительные сведения о запуске и взаимодействии с агентами см. в руководствах по началу работы агента.