Megosztás:


Egyedi ügynökök

A Microsoft Agent Framework az osztályból történő örökléssel és a szükséges metódusok megvalósításával támogatja az AIAgent egyéni ügynökök készítését.

Ez a cikk bemutatja, hogyan lehet egyszerű egyéni ügynököt létrehozni, amely a felhasználói bemenetet nagybetűkkel visszaadja. A legtöbb esetben a saját ügynök létrehozása összetettebb logikát és AI-szolgáltatással való integrációt igényel.

Első lépések

Adja hozzá a szükséges NuGet-csomagokat a projekthez.

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

Egyéni ügynök létrehozása

Az ügynök munkamenete

Egyéni ügynök létrehozásához munkamenetre is szükség van, amely egy beszélgetés állapotának nyomon követésére szolgál, beleértve az üzenetelőzményeket és az ügynök által karbantartandó egyéb állapotokat.

Az első lépések megkönnyítése érdekében örökölhet különböző alaposztályokat, amelyek közös munkamenet-tárolási mechanizmusokat implementálnak.

  1. InMemoryAgentSession - tárolja a csevegési előzményeket a memóriában, és szerializálható A JSON-ra.
  2. ServiceIdAgentSession - nem tárol csevegési előzményeket, de lehetővé teszi, hogy azonosítót társítson a munkamenethez, amely alatt a csevegési előzmények külsőleg tárolhatók.

Ebben a példában az InMemoryAgentSession egyéni munkamenet alaposztályaként fogjuk használni.

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

Az Ügynök osztály

Ezután hozza létre magát az ügynökosztályt az AIAgent osztálytól való örökléssel.

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

Munkamenetek létrehozása

A munkamenetek mindig két gyári módszerrel jönnek létre az ügynökosztályon. Ez lehetővé teszi az ügynök számára a munkamenetek létrehozásának és deszerializálásának szabályozását. Az ügynökök ezért bármilyen további állapotot vagy viselkedést csatolhatnak a munkamenethez a létrehozásakor.

Két módszert kell implementálni:

    public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default) 
        => Task.FromResult<AgentSession>(new CustomAgentSession());

    public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));

Alapvető ügynöklogika

Az ügynök alapvető logikája az, hogy bármilyen bemeneti üzenetet fogad, nagybetűssé konvertálja a szöveget, és válaszüzenetekként adja vissza őket.

A következő metódus hozzáadásával foglalja magában ezt a logikát. A bemeneti üzenetek klónozva vannak, mivel a bemeneti üzenetek különböző aspektusait módosítani kell, hogy érvényes válaszüzenetek legyenek. A szerepkört a következőre kell módosítani: Assistant.

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

Ügynökvégrehajtási módszerek

Végül implementálnia kell az ügynök futtatásához használt két alapvető módszert: egyet a nem streameléshez, egyet pedig a streameléshez.

Mindkét módszer esetében meg kell győződnie arról, hogy egy munkamenet meg van adva, és ha nem, hozzon létre egy új munkamenetet. Az üzenetek lekérhetők és továbbíthatók a ChatHistoryProvider munkamenetbe. Ha ezt nem teszi meg, a felhasználó nem tud többfordulós beszélgetést folytatni az ügynökkel, és minden futtatás új interakció lesz.

    public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

Jótanács

A teljes futtatható példákért tekintse meg a .NET-mintákat .

Az ügynök használata

Ha a AIAgent metódusok mindegyike megfelelően van implementálva, az ügynök egy szabvány AIAgent lesz, és támogatja a szabványos ügynöki műveleteket.

Az ügynökök futtatásával és használatával kapcsolatos további információkért tekintse meg az ügynök első lépéseket ismertető oktatóanyagait.

A Microsoft Agent Framework az osztályból történő örökléssel és a szükséges metódusok megvalósításával támogatja az BaseAgent egyéni ügynökök készítését.

Ez a dokumentum bemutatja, hogyan hozhat létre egy egyszerű egyéni ügynököt, amely egy előtaggal adja vissza a felhasználói bemenetet. A legtöbb esetben a saját ügynök létrehozása összetettebb logikát és AI-szolgáltatással való integrációt igényel.

Első lépések

Adja hozzá a szükséges Python-csomagokat a projekthez.

pip install agent-framework-core --pre

Egyéni ügynök létrehozása

Az ügynökprotokoll

A keretrendszer biztosítja azt a SupportsAgentRun protokollt, amely meghatározza azt a felületet, amelyet minden ügynöknek végre kell hajtania. Az egyéni ügynökök vagy közvetlenül implementálhatják ezt a protokollt, vagy a BaseAgent osztályt kiterjeszthetik a kényelmesebb használat érdekében.

from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    Message,
    ResponseStream,
    SupportsAgentRun,
)

class MyCustomAgent(SupportsAgentRun):
    """A custom agent that implements the SupportsAgentRun directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent and return either an awaitable response or a ResponseStream."""
        ...

Jótanács

Adjon hozzá @overload aláírásokat a run()-hoz, hogy az IDE-k és a statikus típusellenőrzők következtetni tudjanak a visszatérési típusra a stream alapján (Awaitable[AgentResponse] a stream=False és ResponseStream[AgentResponseUpdate, AgentResponse] a stream=True esetén).

A BaseAgent használata

Az ajánlott módszer az BaseAgent osztály kiterjesztése, amely általános funkciókat biztosít, és leegyszerűsíti a megvalósítást:

import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    BaseAgent,
    Content,
    Message,
    ResponseStream,
    normalize_messages,
)


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent.

        Args:
            messages: The message(s) to process.
            stream: If True, return a ResponseStream of updates.
            session: The conversation session (optional).

        Returns:
            When stream=False: An awaitable AgentResponse.
            When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
        """
        if stream:
            return ResponseStream(
                self._run_stream(messages=messages, session=session, **kwargs),
                finalizer=AgentResponse.from_updates,
            )
        return self._run(messages=messages, session=session, **kwargs)

    async def _run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_message = Message(
                role="assistant",
                contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            last_message = normalized_messages[-1]
            echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
            response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])

        if session is not None:
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(response_message)

        return AgentResponse(messages=[response_message])

    async def _run_stream(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            last_message = normalized_messages[-1]
            response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"

        words = response_text.split()
        for i, word in enumerate(words):
            chunk_text = f" {word}" if i > 0 else word
            yield AgentResponseUpdate(
                contents=[Content.from_text(chunk_text)],
                role="assistant",
            )
            await asyncio.sleep(0.1)

        if session is not None:
            complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(complete_response)

Az ügynök használata

Ha az ügynökmetóciók mindegyike megfelelően van implementálva, az ügynök támogatja a szabványos műveleteket, beleértve a streamelést a következő módon ResponseStream:

stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
    print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()

Az ügynökök futtatásával és használatával kapcsolatos további információkért tekintse meg az ügynök első lépéseket ismertető oktatóanyagait.

Következő lépések