Compartir a través de


Agentes personalizados

Microsoft Agent Framework admite la compilación de agentes personalizados heredando de la AIAgent clase e implementando los métodos necesarios.

En este artículo se muestra cómo crear un agente personalizado sencillo que devuelve la entrada del usuario en mayúsculas. En la mayoría de los casos, la creación de su propio agente implicará una lógica y una integración más complejas con un servicio de inteligencia artificial.

Introducción

Agregue los paquetes NuGet necesarios al proyecto.

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

Creación de un agente personalizado

La sesión del agente

Para crear un agente personalizado, también necesita una sesión, que se usa para realizar un seguimiento del estado de una sola conversación, incluido el historial de mensajes, y cualquier otro estado que el agente necesite mantener.

Para facilitar la introducción, puede heredar de varias clases base que implementan mecanismos comunes de almacenamiento de sesión.

  1. InMemoryAgentSession : almacena el historial de chats en la memoria y se puede serializar en JSON.
  2. ServiceIdAgentSession : no almacena ningún historial de chat, pero le permite asociar un identificador a la sesión, en el que el historial de chat se puede almacenar externamente.

En este ejemplo, usará InMemoryAgentSession como clase base para la sesión personalizada.

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

La clase Agent

A continuación, cree la propia clase del agente heredando de la AIAgent clase .

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

Construyendo sesiones

Las sesiones siempre se crean a través de dos métodos factory en la clase del agente. Esto permite al agente controlar cómo se crean y deserializan las sesiones. Por lo tanto, los agentes pueden adjuntar cualquier estado o comportamiento adicional necesario a la sesión cuando se construye.

Es necesario implementar dos métodos:

    public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default) 
        => Task.FromResult<AgentSession>(new CustomAgentSession());

    public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));

Lógica del agente principal

La lógica principal del agente es tomar los mensajes de entrada, convertir su texto en mayúsculas y devolverlos como mensajes de respuesta.

Agregue el siguiente método para contener esta lógica. Los mensajes de entrada se clonan, ya que es necesario modificar varios aspectos de los mensajes de entrada para que sean mensajes de respuesta válidos. Por ejemplo, el rol debe cambiarse a Assistant.

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

Métodos de ejecución del agente

Por último, debe implementar los dos métodos principales que se usan para ejecutar el agente: uno para no streaming y otro para el streaming.

Para ambos métodos, debe asegurarse de que se proporciona una sesión y, si no es así, crear una nueva sesión. Los mensajes se pueden recuperar y pasar a ChatHistoryProvider en la sesión. Si no lo hace, el usuario no podrá tener una conversación multiturno con el agente y cada ejecución será una interacción nueva.

    public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

Sugerencia

Consulte los ejemplos de .NET para obtener ejemplos completos de ejecución.

Uso del agente

Si todos los AIAgent métodos se implementan correctamente, el agente sería un estándar AIAgent y admitiría operaciones de agente estándar.

Para obtener más información sobre cómo ejecutar e interactuar con agentes, consulte los tutoriales de introducción al agente.

Microsoft Agent Framework admite la compilación de agentes personalizados heredando de la BaseAgent clase e implementando los métodos necesarios.

En este documento se muestra cómo crear un agente personalizado sencillo que devuelve la entrada del usuario con un prefijo. En la mayoría de los casos, la creación de su propio agente implicará una lógica y una integración más complejas con un servicio de inteligencia artificial.

Introducción

Agregue los paquetes de Python necesarios al proyecto.

pip install agent-framework-core --pre

Creación de un agente personalizado

El protocolo del agente

El marco proporciona el SupportsAgentRun protocolo que define la interfaz que todos los agentes deben implementar. Los agentes personalizados pueden implementar este protocolo directamente o ampliar la BaseAgent clase para mayor comodidad.

from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    Message,
    ResponseStream,
    SupportsAgentRun,
)

class MyCustomAgent(SupportsAgentRun):
    """A custom agent that implements the SupportsAgentRun directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent and return either an awaitable response or a ResponseStream."""
        ...

Sugerencia

Agregue @overload firmas a run() para que los IDE y los comprobadores de tipos estáticos infieran el tipo de valor devuelto en función de stream (Awaitable[AgentResponse] para stream=False y ResponseStream[AgentResponseUpdate, AgentResponse] para stream=True).

Uso de BaseAgent

El enfoque recomendado es ampliar la BaseAgent clase , que proporciona funcionalidad común y simplifica la implementación:

import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    BaseAgent,
    Content,
    Message,
    ResponseStream,
    normalize_messages,
)


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent.

        Args:
            messages: The message(s) to process.
            stream: If True, return a ResponseStream of updates.
            session: The conversation session (optional).

        Returns:
            When stream=False: An awaitable AgentResponse.
            When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
        """
        if stream:
            return ResponseStream(
                self._run_stream(messages=messages, session=session, **kwargs),
                finalizer=AgentResponse.from_updates,
            )
        return self._run(messages=messages, session=session, **kwargs)

    async def _run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_message = Message(
                role="assistant",
                contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            last_message = normalized_messages[-1]
            echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
            response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])

        if session is not None:
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(response_message)

        return AgentResponse(messages=[response_message])

    async def _run_stream(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            last_message = normalized_messages[-1]
            response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"

        words = response_text.split()
        for i, word in enumerate(words):
            chunk_text = f" {word}" if i > 0 else word
            yield AgentResponseUpdate(
                contents=[Content.from_text(chunk_text)],
                role="assistant",
            )
            await asyncio.sleep(0.1)

        if session is not None:
            complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(complete_response)

Uso del agente

Si todos los métodos del agente se implementan correctamente, el agente admite operaciones estándar, incluido el streaming a través de ResponseStream:.

stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
    print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()

Para obtener más información sobre cómo ejecutar e interactuar con agentes, consulte los tutoriales de introducción al agente.

Pasos siguientes