Поделиться через


Хранение истории чата в стороннем хранилище

В этом руководстве показано, как сохранять журнал чата агента во внешнем хранилище, реализуя пользовательский ChatHistoryProvider и используя его с ChatClientAgent.

По умолчанию при использовании ChatClientAgent история чата хранится в памяти в объекте AgentSession или службе логического вывода, если служба поддерживает эту функцию.

Если службам не требуется хранить журнал чатов в службе, можно предоставить пользовательское хранилище для сохранения журнала чата вместо использования поведения в памяти по умолчанию.

Предпосылки

Предварительные требования см. в разделе "Создание и запуск простого агента" в этом руководстве.

Установка пакетов Nuget

Чтобы использовать Microsoft Agent Framework с Azure OpenAI, необходимо установить следующие пакеты NuGet:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prerelease

Кроме того, вы будете использовать векторное хранилище в памяти для хранения сообщений чата.

dotnet add package Microsoft.SemanticKernel.Connectors.InMemory --prerelease

Создание пользовательского провайдера истории чата

Чтобы создать настраиваемый ChatHistoryProvider, необходимо реализовать абстрактный ChatHistoryProvider класс и предоставить реализации необходимых методов.

Методы хранения сообщений и извлечения

Наиболее важными методами для реализации являются следующие:

  • InvokingAsync — вызывается в начале вызова агента для получения сообщений из хранилища, которые должны быть предоставлены в качестве контекста.
  • InvokedAsync — вызывается в конце вызова агента для добавления новых сообщений в хранилище.

InvokingAsync должен возвращать сообщения в хронологическом порядке возрастания (самый старый первый). Все сообщения, возвращаемые им, будут использоваться ChatClientAgent при вызове базового IChatClientэлемента. Поэтому важно, чтобы этот метод считал ограничения базовой модели и возвращал только столько сообщений, сколько можно обрабатывать моделью.

Перед возвратом сообщений из InvokingAsync следует выполнить любую логику сокращения истории чата, например резюмирование или обрезку.

Сериализация

ChatHistoryProvider экземпляры создаются и присоединяются к AgentSession при создании сеанса и его возобновлении из сериализованного состояния.

Хотя фактические сообщения, составляющие журнал чата, хранятся во внешнем хранилище данных, экземпляру ChatHistoryProvider может потребоваться сохранить ключи или другое состояние, чтобы определить журнал чата во внешнем хранилище.

Чтобы разрешить сохранение сеансов, необходимо реализовать Serialize метод ChatHistoryProvider класса. Этот метод должен вернуть JsonElement содержащий состояние, необходимое для восстановления провайдера позже. При десериализации платформа агента передает это сериализованное состояние в ChatHistoryProviderFactory, что позволяет использовать его для повторного создания поставщика.

Пример реализации ChatHistoryProvider

В следующем примере реализации хранятся сообщения чата в векторном хранилище.

InvokedAsync выполняет вставку или обновление сообщений в векторное хранилище, используя уникальный ключ для каждого сообщения. Он сохраняет сообщения запроса и ответные сообщения из контекста вызова.

InvokingAsync извлекает сообщения для текущего сеанса из векторного хранилища, упорядочивает их по метке времени и возвращает их в хронологическом порядке возрастания (самый старый первый).

При первом вызове хранилище создает уникальный ключ для сеанса, который затем используется для идентификации журнала чата в хранилище векторов для последующих вызовов.

Уникальный ключ хранится в SessionDbKey свойстве, который сериализуется с помощью метода Serialize, а десериализуется через конструктор, принимающий JsonElement. Таким образом, этот ключ будет сохранен как часть AgentSession состояния, что позволяет возобновить сеанс позже и продолжать использовать тот же журнал чата.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

internal sealed class VectorChatHistoryProvider : ChatHistoryProvider
{
    private readonly VectorStore _vectorStore;

    public VectorChatHistoryProvider(
        VectorStore vectorStore,
        JsonElement serializedStoreState,
        JsonSerializerOptions? jsonSerializerOptions = null)
    {
        this._vectorStore = vectorStore ?? throw new ArgumentNullException(nameof(vectorStore));
        if (serializedStoreState.ValueKind is JsonValueKind.String)
        {
            this.SessionDbKey = serializedStoreState.Deserialize<string>();
        }
    }

    public string? SessionDbKey { get; private set; }

    public override async ValueTask<IEnumerable<ChatMessage>> InvokingAsync(
        InvokingContext context,
        CancellationToken cancellationToken = default)
    {
        if (this.SessionDbKey is null)
        {
            // No session key yet, so no messages to retrieve
            return [];
        }

        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        var records = collection
            .GetAsync(
                x => x.SessionId == this.SessionDbKey, 
                10,
                new() { OrderBy = x => x.Descending(y => y.Timestamp) },
                cancellationToken);

        List<ChatMessage> messages = [];
        await foreach (var record in records)
        {
            messages.Add(JsonSerializer.Deserialize<ChatMessage>(record.SerializedMessage!)!);
        }

        // Reverse to return in ascending chronological order (oldest first)
        messages.Reverse();
        return messages;
    }

    public override async ValueTask InvokedAsync(
        InvokedContext context,
        CancellationToken cancellationToken = default)
    {
        // Don't store messages if the request failed.
        if (context.InvokeException is not null)
        {
            return;
        }

        this.SessionDbKey ??= Guid.NewGuid().ToString("N");

        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);

        // Store request messages, response messages, and optionally AIContextProvider messages
        var allNewMessages = context.RequestMessages
            .Concat(context.AIContextProviderMessages ?? [])
            .Concat(context.ResponseMessages ?? []);

        await collection.UpsertAsync(allNewMessages.Select(x => new ChatHistoryItem()
        {
            Key = this.SessionDbKey + x.MessageId,
            Timestamp = DateTimeOffset.UtcNow,
            SessionId = this.SessionDbKey,
            SerializedMessage = JsonSerializer.Serialize(x),
            MessageText = x.Text
        }), cancellationToken);
    }

    public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) =>
        // We have to serialize the session id, so that on deserialization you can retrieve the messages using the same session id.
        JsonSerializer.SerializeToElement(this.SessionDbKey);

    private sealed class ChatHistoryItem
    {
        [VectorStoreKey]
        public string? Key { get; set; }
        [VectorStoreData]
        public string? SessionId { get; set; }
        [VectorStoreData]
        public DateTimeOffset? Timestamp { get; set; }
        [VectorStoreData]
        public string? SerializedMessage { get; set; }
        [VectorStoreData]
        public string? MessageText { get; set; }
    }
}

Использование пользовательского ChatHistoryProvider с ChatClientAgent

Чтобы использовать настраиваемый ChatHistoryProvider, необходимо указать ChatHistoryProviderFactory при создании агента. Эта фабрика объектов позволяет агенту создавать новый экземпляр требуемого ChatHistoryProvider для каждого сеанса.

При создании ChatClientAgent объекта можно предоставить ChatClientAgentOptions объект, который позволяет задать ChatHistoryProviderFactory в дополнение к другим параметрам агента.

Фабрика — это асинхронная функция, которая получает объект контекста и маркер отмены и возвращает объект ValueTask<ChatHistoryProvider>.

using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

// Create a vector store to store the chat messages in.
VectorStore vectorStore = new InMemoryVectorStore();

AIAgent agent = new AzureOpenAIClient(
    new Uri("https://<myresource>.openai.azure.com"),
    new AzureCliCredential())
     .GetChatClient("gpt-4o-mini")
     .AsAIAgent(new ChatClientAgentOptions
     {
         Name = "Joker",
         ChatOptions = new() { Instructions = "You are good at telling jokes." },
         ChatHistoryProviderFactory = (ctx, ct) => new ValueTask<ChatHistoryProvider>(
             // Create a new chat history provider for this agent that stores the messages in a vector store.
             // Each session must get its own copy of the VectorChatHistoryProvider, since the provider
             // also contains the id that the session is stored under.
             new VectorChatHistoryProvider(
                vectorStore,
                ctx.SerializedState,
                ctx.JsonSerializerOptions))
     });

// Start a new session for the agent conversation.
AgentSession session = await agent.GetNewSessionAsync();

// Run the agent with the session
var response = await agent.RunAsync("Tell me a joke about a pirate.", session);

// The session state can be serialized for storage
JsonElement serializedSession = session.Serialize();

// Later, deserialize the session to resume the conversation
AgentSession resumedSession = await agent.DeserializeSessionAsync(serializedSession);

В этом руководстве показано, как сохранять журнал чата агента во внешнем хранилище, реализуя пользовательский ChatMessageStore и используя его с ChatAgent.

По умолчанию при использовании ChatAgent история чата хранится в памяти в объекте AgentThread или службе логического вывода, если служба поддерживает эту функцию.

Если службы не требуют или не могут хранить журнал чата в службе, можно предоставить пользовательское хранилище для сохранения журнала чата вместо того, чтобы полагаться на поведение по умолчанию в памяти.

Предпосылки

Предварительные требования см. в разделе "Создание и запуск простого агента" в этом руководстве.

Создание настраиваемого хранилища ChatMessage

Чтобы создать пользовательскую ChatMessageStore, необходимо реализовать протокол ChatMessageStore и реализовать необходимые методы.

Методы хранения сообщений и извлечения

Наиболее важными методами для реализации являются следующие:

  • add_messages — вызывается для добавления новых сообщений в хранилище.
  • list_messages — вызывается для получения сообщений из хранилища.

list_messages должен возвращать сообщения в хронологическом порядке возрастания. Сообщения, возвращаемые данной функцией, будут использоваться ChatAgent при осуществлении вызовов к базовому клиенту чата. Поэтому важно, чтобы этот метод считал ограничения базовой модели и возвращал только столько сообщений, сколько можно обрабатывать моделью.

Перед возвратом сообщений из list_messages следует выполнить любую логику сокращения истории чата, например резюмирование или обрезку.

Сериализация

ChatMessageStore экземпляры создаются и присоединяются к AgentThread при создании потока и при его возобновлении из сериализованного состояния.

Хотя фактические сообщения, составляющие журнал чата, хранятся во внешнем хранилище данных, экземпляру ChatMessageStore может потребоваться сохранить ключи или другое состояние, чтобы определить журнал чата во внешнем хранилище.

Чтобы разрешить сохранение потоков, необходимо реализовать методы serialize_state и deserialize_state протокола ChatMessageStore. Эти методы позволяют сохранить и восстановить состояние хранилища при возобновлении потока.

Пример реализации ChatMessageStore

В следующем примере реализации хранятся сообщения чата в Redis с помощью структуры данных Redis Lists.

В add_messagesнем хранятся сообщения в Redis с помощью RPUSH, чтобы добавить их в конец списка в хронологическом порядке.

list_messages извлекает сообщения для текущего потока из Redis с помощью LRANGE и возвращает их в хронологическом порядке возрастания.

При получении первого сообщения хранилище создает уникальный ключ для потока, который затем используется для идентификации журнала чата в Redis для последующих вызовов.

Уникальный ключ и другая конфигурация хранятся и могут быть сериализованы и десериализированы с помощью serialize_state методов и deserialize_state методов. Таким образом, это состояние будет сохранено в рамках состояния AgentThread, позволяя потоку возобновиться позже и продолжать использовать тот же самый журнал чата.

from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

Использование пользовательского хранилища сообщений chatMessageStore с ChatAgent

Чтобы использовать настраиваемый ChatMessageStore, необходимо указать chat_message_store_factory при создании агента. Эта фабрика позволяет агенту создавать новый экземпляр требуемого ChatMessageStore для каждого потока.

При создании ChatAgentпараметра можно указать chat_message_store_factory параметр в дополнение ко всем другим параметрам агента.

from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.openai import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        endpoint="https://<myresource>.openai.azure.com",
        credential=AzureCliCredential(),
        ai_model_id="gpt-4o-mini"
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

Дальнейшие шаги