Поделиться через


Настраиваемые агенты

Microsoft Agent Framework поддерживает создание пользовательских агентов, наследуя от AIAgent класса и реализуя необходимые методы.

В этой статье объясняется, как создать простой настраиваемый агент, который повторяет ввод пользователя в верхнем регистре. В большинстве случаев создание собственного агента будет включать более сложную логику и интеграцию со службой ИИ.

Начало работы

Добавьте необходимые пакеты NuGet в проект.

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

Создание пользовательского агента

Сеанс агента

Для создания пользовательского агента также требуется сеанс, который используется для отслеживания состояния отдельной беседы, включая журнал сообщений и любого другого состояния, которое агент должен поддерживать.

Чтобы облегчить начало работы, можно наследовать от различных базовых классов, реализующих общие механизмы хранения сессий.

  1. InMemoryAgentSession — сохраняет журнал чата в памяти и может быть сериализован в JSON.
  2. ServiceIdAgentSession — не хранит журнал чата, но позволяет связать идентификатор с сеансом, в котором журнал чата может храниться внешне.

В этом примере вы будете использовать InMemoryAgentSession в качестве базового класса для пользовательского сеанса.

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

Класс Agent

Затем создайте сам класс агента, наследуя от AIAgent класса.

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

Создание сеансов

Сеансы всегда создаются через два фабричных метода из класса агента. Это позволяет агенту управлять созданием и десериализацией сеансов. Поэтому агенты могут добавлять любое дополнительное состояние или поведение, необходимое для сессии при её создании.

Необходимо реализовать два метода:

    public override Task<AgentSession> GetNewSessionAsync(CancellationToken cancellationToken = default) 
        => Task.FromResult<AgentSession>(new CustomAgentSession());

    public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));

Логика основного агента

Основная логика агента заключается в том, чтобы принимать все входные сообщения, преобразовывать их текст в верхний регистр и возвращать их в виде сообщений ответа.

Добавьте следующий метод, чтобы реализовать эту логику. Входные сообщения клонируются, так как для допустимых ответных сообщений необходимо изменить различные аспекты входных сообщений. Например, роль должна быть изменена на Assistant.

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

Методы запуска агента

Наконец, необходимо реализовать два основных метода, которые используются для запуска агента: один для непотоковой передачи и один для потоковой передачи.

Для обоих методов необходимо убедиться, что предоставлен сеанс, а если нет, создайте новый сеанс. Сообщения можно получить и передать ChatHistoryProvider в рамках сеанса. Если этого не сделать, пользователь не сможет вести диалог с несколькими ходами с агентом, и каждый сеанс будет новым взаимодействием.

    public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.GetNewSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.GetNewSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

Использование агента

AIAgent Если все методы реализованы правильно, агент будет стандартным AIAgent и поддерживает стандартные операции агента.

Дополнительные сведения о запуске и взаимодействии с агентами см. в руководствах по началу работы агента.

Microsoft Agent Framework поддерживает создание пользовательских агентов, наследуя от BaseAgent класса и реализуя необходимые методы.

В этом документе показано, как создать простой пользовательский агент, который возвращает введенные пользователем данные с префиксом. В большинстве случаев создание собственного агента будет включать более сложную логику и интеграцию со службой ИИ.

Начало работы

Добавьте необходимые пакеты Python в проект.

pip install agent-framework-core --pre

Создание пользовательского агента

Протокол агента

Платформа предоставляет протокол, определяющий AgentProtocol интерфейс, который должен реализовать все агенты. Пользовательские агенты могут напрямую реализовать этот протокол или расширить BaseAgent класс для большего удобства.

from agent_framework import AgentProtocol, AgentResponse, AgentResponseUpdate, AgentThread, ChatMessage
from collections.abc import AsyncIterable
from typing import Any

class MyCustomAgent(AgentProtocol):
    """A custom agent that implements the AgentProtocol directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        """Execute the agent and return a complete response."""
        ...

    def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        """Execute the agent and yield streaming response updates."""
        ...

Использование BaseAgent

Рекомендуемый BaseAgent подход — расширить класс, предоставляющий общие функциональные возможности и упрощающий реализацию:

from agent_framework import (
    BaseAgent,
    AgentResponse,
    AgentResponseUpdate,
    AgentThread,
    ChatMessage,
    Role,
    TextContent,
)
from collections.abc import AsyncIterable
from typing import Any


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        """Initialize the EchoAgent.

        Args:
            name: The name of the agent.
            description: The description of the agent.
            echo_prefix: The prefix to add to echoed messages.
            **kwargs: Additional keyword arguments passed to BaseAgent.
        """
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        """Execute the agent and return a complete response.

        Args:
            messages: The message(s) to process.
            thread: The conversation thread (optional).
            **kwargs: Additional keyword arguments.

        Returns:
            An AgentResponse containing the agent's reply.
        """
        # Normalize input messages to a list
        normalized_messages = self._normalize_messages(messages)

        if not normalized_messages:
            response_message = ChatMessage(
                role=Role.ASSISTANT,
                contents=[TextContent(text="Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            # For simplicity, echo the last user message
            last_message = normalized_messages[-1]
            if last_message.text:
                echo_text = f"{self.echo_prefix}{last_message.text}"
            else:
                echo_text = f"{self.echo_prefix}[Non-text message received]"

            response_message = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=echo_text)])

        # Notify the thread of new messages if provided
        if thread is not None:
            await self._notify_thread_of_new_messages(thread, normalized_messages, response_message)

        return AgentResponse(messages=[response_message])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        """Execute the agent and yield streaming response updates.

        Args:
            messages: The message(s) to process.
            thread: The conversation thread (optional).
            **kwargs: Additional keyword arguments.

        Yields:
            AgentResponseUpdate objects containing chunks of the response.
        """
        # Normalize input messages to a list
        normalized_messages = self._normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            # For simplicity, echo the last user message
            last_message = normalized_messages[-1]
            if last_message.text:
                response_text = f"{self.echo_prefix}{last_message.text}"
            else:
                response_text = f"{self.echo_prefix}[Non-text message received]"

        # Simulate streaming by yielding the response word by word
        words = response_text.split()
        for i, word in enumerate(words):
            # Add space before word except for the first one
            chunk_text = f" {word}" if i > 0 else word

            yield AgentResponseUpdate(
                contents=[TextContent(text=chunk_text)],
                role=Role.ASSISTANT,
            )

            # Small delay to simulate streaming
            await asyncio.sleep(0.1)

        # Notify the thread of the complete response if provided
        if thread is not None:
            complete_response = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text=response_text)])
            await self._notify_thread_of_new_messages(thread, normalized_messages, complete_response)

Использование агента

Если все методы агента реализованы правильно, агент будет поддерживать все стандартные операции агента.

Дополнительные сведения о запуске и взаимодействии с агентами см. в руководствах по началу работы агента.

Дальнейшие шаги