Bagikan melalui


Agen Khusus

Microsoft Agent Framework mendukung pembangunan agen kustom dengan mewarisi dari AIAgent kelas dan menerapkan metode yang diperlukan.

Artikel ini menunjukkan cara membangun agen kustom sederhana yang mengulang kembali input pengguna dengan huruf besar. Dalam kebanyakan kasus, membangun agen Anda sendiri akan melibatkan logika dan integrasi yang lebih kompleks dengan layanan AI.

Memulai Langkah Awal

Tambahkan paket NuGet yang diperlukan ke proyek Anda.

dotnet add package Microsoft.Agents.AI.Abstractions --prerelease

Membuat Agen Kustom

Sesi Agen

Untuk membuat agen kustom, Anda juga memerlukan sesi, yang digunakan untuk melacak status percakapan tunggal, termasuk riwayat pesan, dan status lain yang perlu dipertahankan agen.

Untuk memudahkan Anda memulai, Anda dapat mewarisi berbagai kelas dasar yang menerapkan mekanisme penyimpanan sesi umum.

  1. InMemoryAgentSession - menyimpan riwayat obrolan dalam memori dan dapat diserialisasikan ke JSON.
  2. ServiceIdAgentSession - tidak menyimpan riwayat obrolan apa pun, tetapi memungkinkan Anda untuk mengaitkan ID dengan sesi, di mana riwayat obrolan dapat disimpan secara eksternal.

Untuk contoh ini, Anda akan menggunakan InMemoryAgentSession sebagai kelas dasar untuk sesi kustom.

internal sealed class CustomAgentSession : InMemoryAgentSession
{
    internal CustomAgentSession() : base() { }
    internal CustomAgentSession(JsonElement serializedSessionState, JsonSerializerOptions? jsonSerializerOptions = null)
        : base(serializedSessionState, jsonSerializerOptions) { }
}

Kelas Agen

Selanjutnya, buat kelas agen itu sendiri dengan mewarisi dari kelas AIAgent.

internal sealed class UpperCaseParrotAgent : AIAgent
{
}

Membuat sesi

Sesi selalu dibuat melalui dua metode factory di kelas agen. Hal ini memungkinkan agen mengontrol bagaimana sesi dibuat dan dideserialisasi. Oleh karena itu, agen dapat melampirkan status atau perilaku tambahan yang diperlukan ke sesi saat dibangun.

Dua metode diperlukan untuk diimplementasikan:

    public override Task<AgentSession> CreateSessionAsync(CancellationToken cancellationToken = default) 
        => Task.FromResult<AgentSession>(new CustomAgentSession());

    public override Task<AgentSession> DeserializeSessionAsync(JsonElement serializedSession, JsonSerializerOptions? jsonSerializerOptions = null, CancellationToken cancellationToken = default)
        => Task.FromResult<AgentSession>(new CustomAgentSession(serializedSession, jsonSerializerOptions));

Logika agen inti

Logika inti agen adalah mengambil pesan input apa pun, mengonversi teks mereka menjadi huruf besar, dan mengembalikannya sebagai pesan respons.

Tambahkan metode berikut untuk memuat logika ini. Pesan input dikloning, karena berbagai aspek pesan input harus dimodifikasi menjadi pesan respons yang valid. Misalnya, peran harus diubah menjadi Assistant.

    private static IEnumerable<ChatMessage> CloneAndToUpperCase(IEnumerable<ChatMessage> messages, string agentName) => messages.Select(x =>
        {
            var messageClone = x.Clone();
            messageClone.Role = ChatRole.Assistant;
            messageClone.MessageId = Guid.NewGuid().ToString();
            messageClone.AuthorName = agentName;
            messageClone.Contents = x.Contents.Select(c => c is TextContent tc ? new TextContent(tc.Text.ToUpperInvariant())
            {
                AdditionalProperties = tc.AdditionalProperties,
                Annotations = tc.Annotations,
                RawRepresentation = tc.RawRepresentation
            } : c).ToList();
            return messageClone;
        });

Metode eksekusi agen

Terakhir, Anda perlu menerapkan dua metode inti yang digunakan untuk menjalankan agen: satu untuk non-streaming dan satu untuk streaming.

Untuk kedua metode, Anda perlu memastikan bahwa sesi disediakan, dan jika tidak, buat sesi baru. Pesan dapat diambil dan diteruskan ke ChatHistoryProvider pada sesi. Jika Anda tidak melakukan ini, pengguna tidak akan dapat melakukan percakapan bergantian dengan agen dan setiap sesi akan menjadi interaksi baru.

    public override async Task<AgentResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        return new AgentResponse
        {
            AgentId = this.Id,
            ResponseId = Guid.NewGuid().ToString(),
            Messages = responseMessages
        };
    }

    public override async IAsyncEnumerable<AgentResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentSession? session = null, AgentRunOptions? options = null, [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        session ??= await this.CreateSessionAsync(cancellationToken);

        // Get existing messages from the store
        var invokingContext = new ChatHistoryProvider.InvokingContext(messages);
        var storeMessages = await typedSession.ChatHistoryProvider.InvokingAsync(invokingContext, cancellationToken);

        List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();

        // Notify the session of the input and output messages.
        var invokedContext = new ChatHistoryProvider.InvokedContext(messages, storeMessages)
        {
            ResponseMessages = responseMessages
        };
        await typedSession.ChatHistoryProvider.InvokedAsync(invokedContext, cancellationToken);

        foreach (var message in responseMessages)
        {
            yield return new AgentResponseUpdate
            {
                AgentId = this.Id,
                AuthorName = this.DisplayName,
                Role = ChatRole.Assistant,
                Contents = message.Contents,
                ResponseId = Guid.NewGuid().ToString(),
                MessageId = Guid.NewGuid().ToString()
            };
        }
    }

Petunjuk / Saran

Lihat sampel .NET untuk contoh lengkap yang dapat dijalankan.

Menggunakan Agen

AIAgent Jika semua metode diimplementasikan dengan benar, agen akan menjadi AIAgent standar dan mendukung operasi agen standar.

Untuk informasi selengkapnya tentang cara menjalankan dan berinteraksi dengan agen, lihat tutorial Memulai Agen.

Microsoft Agent Framework mendukung pembangunan agen kustom dengan mewarisi dari BaseAgent kelas dan menerapkan metode yang diperlukan.

Dokumen ini menunjukkan cara membuat agen kustom sederhana yang mengulangi input pengguna dengan awalan. Dalam kebanyakan kasus, membangun agen Anda sendiri akan melibatkan logika dan integrasi yang lebih kompleks dengan layanan AI.

Memulai Langkah Awal

Tambahkan paket Python yang diperlukan ke proyek Anda.

pip install agent-framework-core --pre

Membuat Agen Kustom

Protokol Agen

Kerangka kerja menyediakan SupportsAgentRun protokol yang menentukan antarmuka yang harus diterapkan semua agen. Agen kustom dapat menerapkan protokol ini secara langsung atau memperluas BaseAgent kelas untuk kenyamanan.

from typing import Any, Literal, overload
from collections.abc import Awaitable, Sequence
from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    Message,
    ResponseStream,
    SupportsAgentRun,
)

class MyCustomAgent(SupportsAgentRun):
    """A custom agent that implements the SupportsAgentRun directly."""

    @property
    def id(self) -> str:
        """Returns the ID of the agent."""
        ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent and return either an awaitable response or a ResponseStream."""
        ...

Petunjuk / Saran

Tambahkan @overload tanda tangan ke run() sehingga IDEs dan pemeriksa jenis statis menyimpulkan jenis pengembalian berdasarkan stream (Awaitable[AgentResponse] untuk stream=False dan ResponseStream[AgentResponseUpdate, AgentResponse] untuk stream=True).

Menggunakan BaseAgent

Pendekatan yang direkomendasikan adalah memperluas BaseAgent kelas, yang menyediakan fungsionalitas umum dan menyederhanakan implementasi:

import asyncio
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Literal, overload

from agent_framework import (
    AgentResponse,
    AgentResponseUpdate,
    AgentSession,
    BaseAgent,
    Content,
    Message,
    ResponseStream,
    normalize_messages,
)


class EchoAgent(BaseAgent):
    """A simple custom agent that echoes user messages with a prefix."""

    echo_prefix: str = "Echo: "

    def __init__(
        self,
        *,
        name: str | None = None,
        description: str | None = None,
        echo_prefix: str = "Echo: ",
        **kwargs: Any,
    ) -> None:
        super().__init__(
            name=name,
            description=description,
            echo_prefix=echo_prefix,
            **kwargs,
        )

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[False] = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse]: ...

    @overload
    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: Literal[True],
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> ResponseStream[AgentResponseUpdate, AgentResponse]: ...

    def run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        stream: bool = False,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
        """Execute the agent.

        Args:
            messages: The message(s) to process.
            stream: If True, return a ResponseStream of updates.
            session: The conversation session (optional).

        Returns:
            When stream=False: An awaitable AgentResponse.
            When stream=True: A ResponseStream with AgentResponseUpdate items and final response support.
        """
        if stream:
            return ResponseStream(
                self._run_stream(messages=messages, session=session, **kwargs),
                finalizer=AgentResponse.from_updates,
            )
        return self._run(messages=messages, session=session, **kwargs)

    async def _run(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AgentResponse:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_message = Message(
                role="assistant",
                contents=[Content.from_text("Hello! I'm a custom echo agent. Send me a message and I'll echo it back.")],
            )
        else:
            last_message = normalized_messages[-1]
            echo_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"
            response_message = Message(role="assistant", contents=[Content.from_text(echo_text)])

        if session is not None:
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(response_message)

        return AgentResponse(messages=[response_message])

    async def _run_stream(
        self,
        messages: str | Message | Sequence[str | Message] | None = None,
        *,
        session: AgentSession | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentResponseUpdate]:
        normalized_messages = normalize_messages(messages)

        if not normalized_messages:
            response_text = "Hello! I'm a custom echo agent. Send me a message and I'll echo it back."
        else:
            last_message = normalized_messages[-1]
            response_text = f"{self.echo_prefix}{last_message.text}" if last_message.text else f"{self.echo_prefix}[Non-text message received]"

        words = response_text.split()
        for i, word in enumerate(words):
            chunk_text = f" {word}" if i > 0 else word
            yield AgentResponseUpdate(
                contents=[Content.from_text(chunk_text)],
                role="assistant",
            )
            await asyncio.sleep(0.1)

        if session is not None:
            complete_response = Message(role="assistant", contents=[Content.from_text(response_text)])
            stored = session.state.setdefault("memory", {}).setdefault("messages", [])
            stored.extend(normalized_messages)
            stored.append(complete_response)

Menggunakan Agen

Jika semua metode agen diimplementasikan dengan benar, agen mendukung operasi standar, termasuk streaming melalui ResponseStream:

stream = echo_agent.run("Stream this response", stream=True, session=echo_agent.create_session())
async for update in stream:
    print(update.text or "", end="", flush=True)
final_response = await stream.get_final_response()

Untuk informasi selengkapnya tentang cara menjalankan dan berinteraksi dengan agen, lihat tutorial Memulai Agen.

Langkah selanjutnya