Condividi tramite


Agenti personalizzati

Microsoft Agent Framework supporta la creazione di agenti personalizzati ereditando dalla classe AIAgent e implementando i metodi necessari.

Questo articolo illustra come creare un semplice agente personalizzato che ripete l'input dell'utente in lettere maiuscole. Nella maggior parte dei casi, la creazione di un agente implica una logica e un'integrazione più complesse con un servizio di intelligenza artificiale.

Come iniziare

Aggiungere i pacchetti NuGet necessari al progetto.

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

Creare un agente personalizzato

Sessione dell'agente

Per creare un agente personalizzato è necessaria anche una sessione, usata per tenere traccia dello stato di una singola conversazione, inclusa la cronologia dei messaggi e qualsiasi altro stato che l'agente deve gestire.

Per iniziare, è possibile ereditare da varie classi di base che implementano meccanismi comuni di archiviazione delle sessioni.

  1. InMemoryAgentSession : archivia la cronologia delle chat in memoria e può essere serializzata in JSON.
  2. ServiceIdAgentSession - non archivia alcuna cronologia delle chat, ma consente di associare un ID alla sessione, in cui la cronologia della chat può essere archiviata esternamente.

Per questo esempio, utilizzerai la classe di base InMemoryAgentSession per la sessione personalizzata.

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

Classe Agent

Creare quindi la classe dell'agente stessa ereditando dalla AIAgent classe .

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

Creazione di sessioni

Le sessioni vengono sempre create tramite due metodi factory nella classe agent. Ciò consente all'agente di controllare la modalità di creazione e deserializzazione delle sessioni. Gli agenti possono quindi collegare qualsiasi stato o comportamento aggiuntivo necessario alla sessione durante la costruzione.

È necessario implementare due metodi:

    public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default) 
        => Task.FromResult<AgentSession>(new CustomAgentSession());

    public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));

Logica dell'agente principale

La logica di base dell'agente consiste nell'accettare tutti i messaggi di input, convertire il testo in maiuscolo e restituirli come messaggi di risposta.

Aggiungere il metodo seguente per contenere questa logica. I messaggi di input vengono clonati, poiché diversi aspetti dei messaggi di input devono essere modificati per essere messaggi di risposta validi. Ad esempio, il ruolo deve essere modificato in Assistant.

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

Metodi di esecuzione dell'agente

Infine, è necessario implementare i due metodi di base usati per eseguire l'agente: uno per il non streaming e uno per lo streaming.

Per entrambi i metodi, è necessario assicurarsi che venga fornita una sessione e, in caso contrario, creare una nuova sessione. I messaggi possono essere recuperati e passati a ChatHistoryProvider nella sessione. Se non si esegue questa operazione, l'utente non sarà in grado di avere una conversazione a più turni con l'agente e ogni esecuzione sarà una nuova interazione.

    public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

Uso dell'agente

Se i AIAgent metodi sono tutti implementati correttamente, l'agente sarà uno standard AIAgent e supporterebbe le operazioni dell'agente standard.

Per altre informazioni su come eseguire e interagire con gli agenti, vedere le esercitazioni introduttive su Agent.

Microsoft Agent Framework supporta la creazione di agenti personalizzati ereditando dalla classe BaseAgent e implementando i metodi necessari.

Questo documento illustra come creare un semplice agente personalizzato che restituisce l'input dell'utente con un prefisso. Nella maggior parte dei casi, la creazione di un agente implica una logica e un'integrazione più complesse con un servizio di intelligenza artificiale.

Come iniziare

Aggiungere i pacchetti Python necessari al progetto.

pip install agent-framework-core --pre

Creare un agente personalizzato

Protocollo agente

Il framework fornisce il SupportsAgentRun protocollo che definisce l'interfaccia che tutti gli agenti devono implementare. Gli agenti personalizzati possono implementare questo protocollo direttamente o estendere la BaseAgent classe per praticità.

from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    Message,
    ResponseStream,
    SupportsAgentRun,
)

class MyCustomAgent(SupportsAgentRun):
    """A custom agent that implements the SupportsAgentRun directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent and return either an awaitable response or a ResponseStream."""
        ...

Suggerimento

Aggiungere @overload signature a run() in modo che gli IDE e i controlli dei tipi statici deducano il tipo restituito in base a stream (Awaitable[AgentResponse] per stream=False e ResponseStream[AgentResponseUpdate, AgentResponse] per stream=True).

Uso di BaseAgent

L'approccio consigliato consiste nell'estendere la BaseAgent classe , che fornisce funzionalità comuni e semplifica l'implementazione:

import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    BaseAgent,
    Content,
    Message,
    ResponseStream,
    normalize_messages,
)


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent.

        Args:
            messages: The message(s) to process.
            stream: If True, return a ResponseStream of updates.
            session: The conversation session (optional).

        Returns:
            When stream=False: An awaitable AgentResponse.
            When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
        """
        if stream:
            return ResponseStream(
                self._run_stream(messages=messages, session=session, **kwargs),
                finalizer=AgentResponse.from_updates,
            )
        return self._run(messages=messages, session=session, **kwargs)

    async def _run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_message = Message(
                role="assistant",
                contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            last_message = normalized_messages[-1]
            echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
            response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])

        if session is not None:
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(response_message)

        return AgentResponse(messages=[response_message])

    async def _run_stream(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            last_message = normalized_messages[-1]
            response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"

        words = response_text.split()
        for i, word in enumerate(words):
            chunk_text = f" {word}" if i > 0 else word
            yield AgentResponseUpdate(
                contents=[Content.from_text(chunk_text)],
                role="assistant",
            )
            await asyncio.sleep(0.1)

        if session is not None:
            complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(complete_response)

Uso dell'agente

Se tutti i metodi dell'agente vengono implementati correttamente, l'agente supporta le operazioni standard, incluso lo streaming tramite ResponseStream:

stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
    print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()

Per altre informazioni su come eseguire e interagire con gli agenti, vedere le esercitazioni introduttive su Agent.

Passaggi successivi