Freigeben über


Speichern des Chatverlaufs im Speicher von Drittanbietern

In diesem Lernprogramm wird gezeigt, wie Sie den Agent-Chatverlauf im externen Speicher speichern können, indem Sie eine benutzerdefinierte ChatMessageStore implementieren und diese mit einer ChatClientAgent verwenden.

Bei Verwendung ChatClientAgentwird der Chatverlauf standardmäßig entweder im Speicher des AgentThread Objekts oder im zugrunde liegenden Rückschlussdienst gespeichert, wenn der Dienst ihn unterstützt.

Wenn Dienste keinen Chatverlauf im Dienst speichern müssen, ist es möglich, einen benutzerdefinierten Speicher für das Beibehalten des Chatverlaufs bereitzustellen, anstatt sich auf das Standardverhalten im Arbeitsspeicher zu verlassen.

Voraussetzungen

Voraussetzungen finden Sie im Schritt "Erstellen und Ausführen eines einfachen Agents " in diesem Lernprogramm.

Installieren von NuGet-Paketen

Um Microsoft Agent Framework mit Azure OpenAI zu verwenden, müssen Sie die folgenden NuGet-Pakete installieren:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prerelease

Darüber hinaus verwenden Sie den speicherinternen Vektorspeicher, um Chatnachrichten zu speichern.

dotnet add package Microsoft.SemanticKernel.Connectors.InMemory --prerelease

Erstellen eines benutzerdefinierten ChatMessage-Speichers

Zum Erstellen einer benutzerdefinierten ChatMessageStoreKlasse müssen Sie die abstrakte ChatMessageStore Klasse implementieren und Implementierungen für die erforderlichen Methoden bereitstellen.

Nachrichtenspeicher- und Abrufmethoden

Die wichtigsten Zu implementierenden Methoden sind:

  • AddMessagesAsync – Wird aufgerufen, um dem Nachrichtenspeicher neue Nachrichten hinzuzufügen.
  • GetMessagesAsync – Wird aufgerufen, um die Nachrichten aus dem Speicher abzurufen.

GetMessagesAsync sollte die Nachrichten in aufsteigender chronologischer Reihenfolge zurückgeben. Alle Nachrichten, die von ihr zurückgegeben werden, werden beim Aufrufen der zugrunde liegenden ChatClientAgent durch die IChatClient verwendet. Daher ist es wichtig, dass diese Methode die Grenzen des zugrunde liegenden Modells berücksichtigt und nur so viele Nachrichten zurückgibt, wie vom Modell verarbeitet werden können.

Jede Logik zur Reduzierung des Chatverlaufs, wie beispielsweise durch Zusammenfassungen oder Kürzungen, sollte erfolgen, bevor Nachrichten von GetMessagesAsync zurückgegeben werden.

Serialisierung

ChatMessageStore Instanzen werden erstellt und an ein AgentThread angefügt, wenn der Thread erstellt wird und wenn er aus einem serialisierten Zustand fortgesetzt wird.

Während die tatsächlichen Nachrichten, aus denen der Chatverlauf besteht, extern gespeichert werden, muss die ChatMessageStore Instanz möglicherweise Schlüssel oder einen anderen Status speichern, um den Chatverlauf im externen Speicher zu identifizieren.

Um das Beibehalten von Threads zuzulassen, müssen Sie die SerializeStateAsync Methode der ChatMessageStore Klasse implementieren. Sie müssen auch einen Konstruktor bereitstellen, der einen JsonElement Parameter verwendet, der zum Deserialisieren des Zustands beim Fortsetzen eines Threads verwendet werden kann.

Beispielimplementierung von ChatMessageStore

In der folgenden Beispielimplementierung werden Chatnachrichten in einem Vektorspeicher gespeichert.

AddMessagesAsync Upsert Nachrichten in den Vektorspeicher, wobei für jede Nachricht ein eindeutiger Schlüssel verwendet wird.

GetMessagesAsync Ruft die Nachrichten für den aktuellen Thread aus dem Vektorspeicher ab, sortiert sie nach Zeitstempel und gibt sie in aufsteigender Reihenfolge zurück.

Wenn die erste Nachricht empfangen wird, generiert der Speicher einen eindeutigen Schlüssel für den Thread, der dann verwendet wird, um den Chatverlauf im Vektorspeicher für nachfolgende Anrufe zu identifizieren.

Der eindeutige Schlüssel wird in der ThreadDbKey-Eigenschaft gespeichert, die mit der SerializeStateAsync-Methode und mit dem Konstruktor JsonElement serialisiert und deserialisiert wird. Dieser Schlüssel wird daher als Teil des AgentThread-Zustands beibehalten, wodurch der Thread später fortgesetzt werden und derselbe Chatverlauf weiterhin genutzt werden kann.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

internal sealed class VectorChatMessageStore : ChatMessageStore
{
    private readonly VectorStore _vectorStore;

    public VectorChatMessageStore(
        VectorStore vectorStore,
        JsonElement serializedStoreState,
        JsonSerializerOptions? jsonSerializerOptions = null)
    {
        this._vectorStore = vectorStore ?? throw new ArgumentNullException(nameof(vectorStore));
        if (serializedStoreState.ValueKind is JsonValueKind.String)
        {
            this.ThreadDbKey = serializedStoreState.Deserialize<string>();
        }
    }

    public string? ThreadDbKey { get; private set; }

    public override async Task AddMessagesAsync(
        IEnumerable<ChatMessage> messages,
        CancellationToken cancellationToken)
    {
        this.ThreadDbKey ??= Guid.NewGuid().ToString("N");
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        await collection.UpsertAsync(messages.Select(x => new ChatHistoryItem()
        {
            Key = this.ThreadDbKey + x.MessageId,
            Timestamp = DateTimeOffset.UtcNow,
            ThreadId = this.ThreadDbKey,
            SerializedMessage = JsonSerializer.Serialize(x),
            MessageText = x.Text
        }), cancellationToken);
    }

    public override async Task<IEnumerable<ChatMessage>> GetMessagesAsync(
        CancellationToken cancellationToken)
    {
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        var records = collection
            .GetAsync(
                x => x.ThreadId == this.ThreadDbKey, 10,
                new() { OrderBy = x => x.Descending(y => y.Timestamp) },
                cancellationToken);

        List<ChatMessage> messages = [];
        await foreach (var record in records)
        {
            messages.Add(JsonSerializer.Deserialize<ChatMessage>(record.SerializedMessage!)!);
        }

        messages.Reverse();
        return messages;
    }

    public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) =>
        // We have to serialize the thread id, so that on deserialization you can retrieve the messages using the same thread id.
        JsonSerializer.SerializeToElement(this.ThreadDbKey);

    private sealed class ChatHistoryItem
    {
        [VectorStoreKey]
        public string? Key { get; set; }
        [VectorStoreData]
        public string? ThreadId { get; set; }
        [VectorStoreData]
        public DateTimeOffset? Timestamp { get; set; }
        [VectorStoreData]
        public string? SerializedMessage { get; set; }
        [VectorStoreData]
        public string? MessageText { get; set; }
    }
}

Verwenden des benutzerdefinierten ChatMessageStore mit einem ChatClientAgent

Um die benutzerdefinierten ChatMessageStore zu verwenden, müssen Sie beim Erstellen eines Agents eine ChatMessageStoreFactory bereitstellen. Diese Factory ermöglicht es dem Agenten, für jeden Thread eine neue Instanz des gewünschten ChatMessageStore zu erstellen.

Beim Erstellen eines ChatClientAgent Objekts ist es möglich, ein ChatClientAgentOptions Objekt bereitzustellen, das zusätzlich zu allen anderen Agent-Optionen die ChatMessageStoreFactory Bereitstellung ermöglicht.

using Azure.AI.OpenAI;
using Azure.Identity;
using OpenAI;

AIAgent agent = new AzureOpenAIClient(
    new Uri("https://<myresource>.openai.azure.com"),
    new AzureCliCredential())
     .GetChatClient("gpt-4o-mini")
     .CreateAIAgent(new ChatClientAgentOptions
     {
         Name = "Joker",
         Instructions = "You are good at telling jokes.",
         ChatMessageStoreFactory = ctx =>
         {
             // Create a new chat message store for this agent that stores the messages in a vector store.
             return new VectorChatMessageStore(
                new InMemoryVectorStore(),
                ctx.SerializedState,
                ctx.JsonSerializerOptions);
         }
     });

In diesem Lernprogramm wird gezeigt, wie Sie den Agent-Chatverlauf im externen Speicher speichern können, indem Sie eine benutzerdefinierte ChatMessageStore implementieren und diese mit einer ChatAgent verwenden.

Bei Verwendung ChatAgentwird der Chatverlauf standardmäßig entweder im Speicher des AgentThread Objekts oder im zugrunde liegenden Rückschlussdienst gespeichert, wenn der Dienst ihn unterstützt.

Wenn Dienste den Chatverlauf nicht erfordern oder nicht die Möglichkeit haben, ihn im Dienst zu speichern, ist es möglich, anstelle des Standardverhaltens im Arbeitsspeicher einen benutzerdefinierten Speicher für die Speicherung des Chatverlaufs bereitzustellen.

Voraussetzungen

Voraussetzungen finden Sie im Schritt "Erstellen und Ausführen eines einfachen Agents " in diesem Lernprogramm.

Erstellen eines benutzerdefinierten ChatMessage-Speichers

Zum Erstellen einer benutzerdefinierten ChatMessageStoreMethode müssen Sie das ChatMessageStore Protokoll implementieren und Implementierungen für die erforderlichen Methoden bereitstellen.

Nachrichtenspeicher- und Abrufmethoden

Die wichtigsten Zu implementierenden Methoden sind:

  • add_messages – Wird aufgerufen, um dem Nachrichtenspeicher neue Nachrichten hinzuzufügen.
  • list_messages – Wird aufgerufen, um die Nachrichten aus dem Speicher abzurufen.

list_messages sollte die Nachrichten in aufsteigender chronologischer Reihenfolge zurückgeben. Alle Nachrichten, die von dieser Funktion zurückgegeben werden, werden vom ChatAgent verwendet, wenn Anrufe an den zugrunde liegenden Chatclient getätigt werden. Daher ist es wichtig, dass diese Methode die Grenzen des zugrunde liegenden Modells berücksichtigt und nur so viele Nachrichten zurückgibt, wie vom Modell verarbeitet werden können.

Jede Logik zur Reduzierung des Chatverlaufs, wie beispielsweise durch Zusammenfassungen oder Kürzungen, sollte erfolgen, bevor Nachrichten von list_messages zurückgegeben werden.

Serialisierung

ChatMessageStore Instanzen werden erstellt und an ein AgentThread angefügt, wenn der Thread erstellt wird und wenn er aus einem serialisierten Zustand fortgesetzt wird.

Während die tatsächlichen Nachrichten, aus denen der Chatverlauf besteht, extern gespeichert werden, muss die ChatMessageStore Instanz möglicherweise Schlüssel oder einen anderen Status speichern, um den Chatverlauf im externen Speicher zu identifizieren.

Um das Fortbestehen von Threads zu gewährleisten, müssen Sie die serialize_state und deserialize_state Methoden des ChatMessageStore Protokolls implementieren. Mit diesen Methoden kann der Zustand des Stores gespeichert und wiederhergestellt werden, wenn ein Thread fortgesetzt wird.

Beispielimplementierung von ChatMessageStore

In der folgenden Beispielimplementierung werden Chatnachrichten in Redis mithilfe der Datenstruktur "Redis Lists" gespeichert.

In add_messages, speichert sie Nachrichten in Redis mithilfe von RPUSH, um sie an das Ende der Liste in chronologischer Reihenfolge anzufügen.

list_messages Ruft die Nachrichten für den aktuellen Thread mithilfe von LRANGE aus Redis ab und gibt sie in aufsteigender chronologischer Reihenfolge zurück.

Wenn die erste Nachricht empfangen wird, generiert der Informationsspeicher einen eindeutigen Schlüssel für den Thread, der dann zum Identifizieren des Chatverlaufs in Redis für nachfolgende Anrufe verwendet wird.

Der eindeutige Schlüssel und andere Konfigurationen werden gespeichert und können mithilfe der serialize_state methoden deserialize_state serialisiert und deserialisiert werden. Dieser Zustand wird daher als Teil des AgentThread Zustands gespeichert, um den Thread später wieder aufzunehmen und denselben Chatverlauf weiter zu verwenden.

from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

Verwenden des benutzerdefinierten ChatMessageStore mit einem ChatAgent

Um die benutzerdefinierten ChatMessageStore zu verwenden, müssen Sie beim Erstellen eines Agents eine chat_message_store_factory bereitstellen. Diese Factory ermöglicht es dem Agenten, für jeden Thread eine neue Instanz des gewünschten ChatMessageStore zu erstellen.

Beim Erstellen eines ChatAgentParameters können Sie zusätzlich zu allen anderen Agent-Optionen den chat_message_store_factory Parameter angeben.

from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.openai import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        endpoint="https://<myresource>.openai.azure.com",
        credential=AzureCliCredential(),
        ai_model_id="gpt-4o-mini"
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

Nächste Schritte