Bagikan melalui


Menyimpan Riwayat Obrolan di Penyimpanan Pihak Ke-3

Tutorial ini menunjukkan cara menyimpan riwayat obrolan agen di penyimpanan eksternal dengan mengimplementasikan ChatHistoryProvider khusus dan menggunakannya dengan ChatClientAgent.

Secara default, saat menggunakan ChatClientAgent, riwayat obrolan disimpan baik dalam memori di AgentSession objek atau layanan inferensi yang mendasar, jika layanan mendukungnya.

Di mana layanan tidak mengharuskan riwayat obrolan disimpan dalam layanan, dimungkinkan untuk menyediakan penyimpanan kustom untuk riwayat obrolan yang bertahan alih-alih mengandalkan perilaku dalam memori default.

Prasyarat

Untuk prasyarat, lihat langkah Membuat dan menjalankan agen sederhana dalam tutorial ini.

Menginstal paket NuGet

Untuk menggunakan Microsoft Agent Framework dengan Azure OpenAI, Anda perlu menginstal paket NuGet berikut:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prerelease

Selain itu, Anda akan menggunakan penyimpanan vektor dalam memori untuk menyimpan pesan obrolan.

dotnet add package Microsoft.SemanticKernel.Connectors.InMemory --prerelease

Membuat ChatHistoryProvider kustom

Untuk membuat kustom ChatHistoryProvider, Anda perlu menerapkan kelas abstrak ChatHistoryProvider dan menyediakan implementasi untuk metode yang diperlukan.

Metode penyimpanan dan pengambilan pesan

Metode terpenting untuk diterapkan adalah:

  • InvokingAsync - dipanggil pada awal pemanggilan agen untuk mengambil pesan dari sistem penyimpanan yang akan dijadikan konteks.
  • InvokedAsync - dipanggil pada akhir pemanggilan agen untuk menambahkan pesan baru ke toko.

InvokingAsync harus mengembalikan pesan dalam urutan kronologis naik (terlama terlebih dahulu). Semua pesan yang dikembalikan olehnya akan digunakan oleh ChatClientAgent saat melakukan panggilan ke sistem dasar IChatClient. Oleh karena itu penting bahwa metode ini mempertimbangkan batas model yang mendasar, dan hanya mengembalikan pesan sebanyak yang dapat ditangani oleh model.

Logika pengurangan riwayat obrolan apa pun, seperti ringkasan atau pemangkasan, harus dilakukan sebelum mengembalikan pesan dari InvokingAsync.

Serialization

ChatHistoryProvider instance dibuat dan dilampirkan ke AgentSession saat sesi dibuat, dan ketika sesi dilanjutkan dari status terserialisasi.

Meskipun pesan aktual yang membentuk riwayat obrolan disimpan secara eksternal, ChatHistoryProvider instans mungkin perlu menyimpan kunci atau status lain untuk mengidentifikasi riwayat obrolan di penyimpanan eksternal.

Untuk memungkinkan persistensi sesi, Anda perlu menerapkan metode Serialize dari kelas ChatHistoryProvider. Metode ini harus mengembalikan JsonElement yang berisi status yang diperlukan untuk mengembalikan penyedia nanti. Saat deserialisasi, kerangka kerja agen akan meneruskan status berseri ini ke ChatHistoryProviderFactory, memungkinkan Anda menggunakannya untuk membuat ulang penyedia.

Contoh implementasi ChatHistoryProvider

Contoh implementasi berikut menyimpan pesan obrolan di penyimpanan vektor.

InvokedAsync melakukan penyisipan atau pembaruan pesan ke penyimpanan vektor, menggunakan kunci unik untuk setiap pesan. Ini menyimpan pesan permintaan dan pesan respons dari konteks pemanggilan.

InvokingAsync mengambil pesan untuk sesi saat ini dari penyimpanan vektor, mengurutkannya berdasarkan tanda waktu, dan mengembalikannya dalam urutan kronologis naik (terlama terlebih dahulu).

Ketika pemanggilan pertama terjadi, penyimpanan menghasilkan kunci unik untuk sesi, yang kemudian digunakan untuk mengidentifikasi riwayat obrolan di penyimpanan vektor untuk panggilan berikutnya.

Kunci unik disimpan dalam properti SessionDbKey, yang diserialisasi menggunakan metode Serialize dan dideserialisasi melalui konstruktor yang menerima JsonElement. Oleh karena itu, kunci ini akan dipertahankan sebagai bagian AgentSession dari status, memungkinkan sesi dilanjutkan nanti dan terus menggunakan riwayat obrolan yang sama.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

internal sealed class VectorChatHistoryProvider : ChatHistoryProvider
{
    private readonly VectorStore _vectorStore;

    public VectorChatHistoryProvider(
        VectorStore vectorStore,
        JsonElement serializedStoreState,
        JsonSerializerOptions? jsonSerializerOptions = null)
    {
        this._vectorStore = vectorStore ?? throw new ArgumentNullException(nameof(vectorStore));
        if (serializedStoreState.ValueKind is JsonValueKind.String)
        {
            this.SessionDbKey = serializedStoreState.Deserialize<string>();
        }
    }

    public string? SessionDbKey { get; private set; }

    public override async ValueTask<IEnumerable<ChatMessage>> InvokingAsync(
        InvokingContext context,
        CancellationToken cancellationToken = default)
    {
        if (this.SessionDbKey is null)
        {
            // No session key yet, so no messages to retrieve
            return [];
        }

        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        var records = collection
            .GetAsync(
                x => x.SessionId == this.SessionDbKey, 
                10,
                new() { OrderBy = x => x.Descending(y => y.Timestamp) },
                cancellationToken);

        List<ChatMessage> messages = [];
        await foreach (var record in records)
        {
            messages.Add(JsonSerializer.Deserialize<ChatMessage>(record.SerializedMessage!)!);
        }

        // Reverse to return in ascending chronological order (oldest first)
        messages.Reverse();
        return messages;
    }

    public override async ValueTask InvokedAsync(
        InvokedContext context,
        CancellationToken cancellationToken = default)
    {
        // Don't store messages if the request failed.
        if (context.InvokeException is not null)
        {
            return;
        }

        this.SessionDbKey ??= Guid.NewGuid().ToString("N");

        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);

        // Store request messages, response messages, and optionally AIContextProvider messages
        var allNewMessages = context.RequestMessages
            .Concat(context.AIContextProviderMessages ?? [])
            .Concat(context.ResponseMessages ?? []);

        await collection.UpsertAsync(allNewMessages.Select(x => new ChatHistoryItem()
        {
            Key = this.SessionDbKey + x.MessageId,
            Timestamp = DateTimeOffset.UtcNow,
            SessionId = this.SessionDbKey,
            SerializedMessage = JsonSerializer.Serialize(x),
            MessageText = x.Text
        }), cancellationToken);
    }

    public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) =>
        // We have to serialize the session id, so that on deserialization you can retrieve the messages using the same session id.
        JsonSerializer.SerializeToElement(this.SessionDbKey);

    private sealed class ChatHistoryItem
    {
        [VectorStoreKey]
        public string? Key { get; set; }
        [VectorStoreData]
        public string? SessionId { get; set; }
        [VectorStoreData]
        public DateTimeOffset? Timestamp { get; set; }
        [VectorStoreData]
        public string? SerializedMessage { get; set; }
        [VectorStoreData]
        public string? MessageText { get; set; }
    }
}

Menggunakan ChatHistoryProvider kustom dengan ChatClientAgent

Untuk menggunakan kustom ChatHistoryProvider, Anda perlu menyediakan ChatHistoryProviderFactory saat membuat agen. Fabrik ini memungkinkan agen untuk membuat instance baru yang diinginkan ChatHistoryProvider untuk setiap sesi.

Saat membuat ChatClientAgent, Anda dapat menyediakan ChatClientAgentOptions objek yang memungkinkan Anda menyediakan ChatHistoryProviderFactory selain semua opsi agen lainnya.

Fungsi pabrik adalah fungsi asinkron yang menerima objek konteks dan token pembatalan, dan mengembalikan ValueTask<ChatHistoryProvider>.

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

// Create a vector store to store the chat messages in.
VectorStore vectorStore = new InMemoryVectorStore();

AIAgent agent = new AzureOpenAIClient(
    new Uri("https://<myresource>.openai.azure.com"),
    new AzureCliCredential())
     .GetChatClient("gpt-4o-mini")
     .AsAIAgent(new ChatClientAgentOptions
     {
         Name = "Joker",
         ChatOptions = new() { Instructions = "You are good at telling jokes." },
         ChatHistoryProviderFactory = (ctx, ct) => new ValueTask<ChatHistoryProvider>(
             // Create a new chat history provider for this agent that stores the messages in a vector store.
             // Each session must get its own copy of the VectorChatHistoryProvider, since the provider
             // also contains the id that the session is stored under.
             new VectorChatHistoryProvider(
                vectorStore,
                ctx.SerializedState,
                ctx.JsonSerializerOptions))
     });

// Start a new session for the agent conversation.
AgentSession session = await agent.GetNewSessionAsync();

// Run the agent with the session
var response = await agent.RunAsync("Tell me a joke about a pirate.", session);

// The session state can be serialized for storage
JsonElement serializedSession = session.Serialize();

// Later, deserialize the session to resume the conversation
AgentSession resumedSession = await agent.DeserializeSessionAsync(serializedSession);

Tutorial ini menunjukkan cara menyimpan riwayat obrolan agen di penyimpanan eksternal dengan mengimplementasikan ChatMessageStore khusus dan menggunakannya dengan ChatAgent.

Secara default, saat menggunakan ChatAgent, riwayat obrolan disimpan baik dalam memori di AgentThread objek atau layanan inferensi yang mendasar, jika layanan mendukungnya.

Jika layanan tidak memerlukan atau tidak mampu riwayat obrolan disimpan dalam layanan, dimungkinkan untuk menyediakan penyimpanan kustom untuk mempertahankan riwayat obrolan alih-alih mengandalkan perilaku dalam memori default.

Prasyarat

Untuk prasyarat, lihat langkah Membuat dan menjalankan agen sederhana dalam tutorial ini.

Membuat sebuah penyimpanan pesan obrolan yang khusus

Untuk membuat ChatMessageStore khusus, Anda perlu menerapkan protokol ChatMessageStore dan menyediakan implementasi untuk metode yang diperlukan.

Metode penyimpanan dan pengambilan pesan

Metode terpenting untuk diterapkan adalah:

  • add_messages - dipanggil untuk menambahkan pesan baru ke toko.
  • list_messages - dipanggil untuk mengambil pesan dari toko.

list_messages harus mengembalikan pesan dalam urutan kronologis naik. Semua pesan yang dikembalikan olehnya akan digunakan oleh ChatAgent saat melakukan panggilan ke klien obrolan yang mendasar. Oleh karena itu penting bahwa metode ini mempertimbangkan batas model yang mendasar, dan hanya mengembalikan pesan sebanyak yang dapat ditangani oleh model.

Logika pengurangan riwayat obrolan apa pun, seperti ringkasan atau pemangkasan, harus dilakukan sebelum mengembalikan pesan dari list_messages.

Serialization

ChatMessageStore instance dibuat dan dilampirkan ke AgentThread saat utas dibuat, dan ketika utas dilanjutkan dari status ter-serialisasi.

Meskipun pesan aktual yang membentuk riwayat obrolan disimpan secara eksternal, ChatMessageStore instans mungkin perlu menyimpan kunci atau status lain untuk mengidentifikasi riwayat obrolan di penyimpanan eksternal.

Untuk memungkinkan keberlanjutan utas, Anda perlu menerapkan serialize_state dan deserialize_state metode dari ChatMessageStore protokol. Metode ini memungkinkan status penyimpanan toko dipertahankan dan dipulihkan saat melanjutkan thread.

Contoh implementasi ChatMessageStore

Contoh implementasi berikut menyimpan pesan obrolan di Redis menggunakan struktur data Redis Lists.

Dalam add_messages, ia menyimpan pesan di Redis menggunakan RPUSH untuk menambahkannya ke akhir daftar dalam urutan kronologis.

list_messages mengambil pesan dari utas yang sedang berlangsung di Redis menggunakan LRANGE, dan mengembalikannya dalam urutan kronologis menaik.

Ketika pesan pertama diterima, penyimpanan menghasilkan kunci unik untuk utas, yang kemudian digunakan untuk mengidentifikasi riwayat obrolan di Redis untuk panggilan berikutnya.

Kunci unik dan konfigurasi lainnya disimpan dan dapat diserialisasi serta dideserialisasi menggunakan metode serialize_state dan deserialize_state. Oleh karena itu, keadaan ini akan dipertahankan sebagai bagian dari AgentThread status, memungkinkan thread dapat dilanjutkan kembali di kemudian waktu dan terus menggunakan riwayat obrolan yang sama.

from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

Menggunakan ChatMessageStore kustom dengan ChatAgent

Untuk menggunakan kustom ChatMessageStore, Anda perlu menyediakan chat_message_store_factory saat membuat agen. Pabrik ini memungkinkan agen untuk membuat instans baru yang diinginkan ChatMessageStore untuk setiap utas.

Saat membuat ChatAgent, Anda dapat menyediakan parameter chat_message_store_factory bersama dengan semua opsi agen lainnya.

from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.openai import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        endpoint="https://<myresource>.openai.azure.com",
        credential=AzureCliCredential(),
        ai_model_id="gpt-4o-mini"
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

Langkah selanjutnya