다음을 통해 공유


타사 스토리지에 채팅 기록 저장

이 자습서에서는 사용자 지정 ChatMessageStore 을 구현하고 함께 사용하여 ChatClientAgent에이전트 채팅 기록을 외부 스토리지에 저장하는 방법을 보여 줍니다.

기본적으로 ChatClientAgent를 사용하는 경우, 서비스에서 지원하면 채팅 기록은 AgentThread 개체의 메모리 또는 기본 유추 서비스에 저장됩니다.

서비스에서 채팅 기록을 서비스에 저장할 필요가 없는 경우 기본 메모리 내 동작에 의존하는 대신 채팅 기록을 유지하기 위한 사용자 지정 저장소를 제공할 수 있습니다.

필수 조건

필수 구성 요소는 이 자습서의 간단한 에이전트 만들기 및 실행을 참조하세요.

NuGet 패키지 설치

Azure OpenAI에서 Microsoft 에이전트 프레임워크를 사용하려면 다음 NuGet 패키지를 설치해야 합니다.

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.OpenAI --prerelease

또한 메모리 내 벡터 저장소를 사용하여 채팅 메시지를 저장합니다.

dotnet add package Microsoft.SemanticKernel.Connectors.InMemory --prerelease

사용자 지정 ChatMessage 스토어 만들기

사용자 지정 ChatMessageStore을 만들려면 추상 ChatMessageStore 클래스를 구현하고 필요한 메서드에 대한 구현을 제공해야 합니다.

메시지 스토리지 및 검색 방법

구현할 가장 중요한 방법은 다음과 같습니다.

  • AddMessagesAsync - 저장소에 새 메시지를 추가하기 위해 호출됩니다.
  • GetMessagesAsync - 저장소에서 메시지를 검색하기 위해 호출됩니다.

GetMessagesAsync 는 메시지를 오름차순으로 반환해야 합니다. 반환된 모든 메시지는 ChatClientAgent가 기본IChatClient을 호출할 때 사용됩니다. 따라서 이 메서드는 기본 모델의 제한을 고려하고 모델에서 처리할 수 있는 만큼의 메시지만 반환하는 것이 중요합니다.

요약 또는 트리밍과 같은 모든 채팅 기록 줄이기 논리는 메시지를 GetMessagesAsync반환하기 전에 수행해야 합니다.

직렬화

ChatMessageStore 인스턴스는 스레드가 생성될 때와 스레드가 직렬화된 상태에서 다시 시작될 때 AgentThread에 생성되고 연결됩니다.

채팅 기록을 구성하는 실제 메시지는 외부에 저장되지만 인스턴스 ChatMessageStore 는 외부 저장소에서 채팅 기록을 식별하기 위해 키 또는 기타 상태를 저장해야 할 수 있습니다.

스레드를 지속하려면 SerializeStateAsync 클래스의 ChatMessageStore 메서드를 구현해야 합니다. 또한 스레드를 다시 시작하는 경우 상태를 역직렬화하는 데 사용할 수 있는 매개 변수를 사용하는 JsonElement 생성자를 제공해야 합니다.

샘플 ChatMessageStore 구현

다음 샘플 구현에서는 채팅 메시지를 벡터 저장소에 저장합니다.

AddMessagesAsync 각 메시지에 고유한 키를 사용하여 메시지를 벡터 저장소에 삽입하거나 업데이트합니다.

GetMessagesAsync 는 벡터 저장소에서 현재 스레드에 대한 메시지를 검색하고 타임스탬프별로 정렬한 다음 오름차순으로 반환합니다.

첫 번째 메시지가 수신되면 저장소는 스레드에 대한 고유 키를 생성한 다음, 후속 호출을 위해 벡터 저장소의 채팅 기록을 식별하는 데 사용됩니다.

고유 키는 ThreadDbKey 속성에 저장되며, 이는 SerializeStateAsync 메서드와 JsonElement를 사용하는 생성자에 의해 직렬화 및 역직렬화됩니다. 따라서 이 키는 상태의 AgentThread 일부로 유지되므로 나중에 스레드를 다시 시작하고 동일한 채팅 기록을 계속 사용할 수 있습니다.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.VectorData;
using Microsoft.SemanticKernel.Connectors.InMemory;

internal sealed class VectorChatMessageStore : ChatMessageStore
{
    private readonly VectorStore _vectorStore;

    public VectorChatMessageStore(
        VectorStore vectorStore,
        JsonElement serializedStoreState,
        JsonSerializerOptions? jsonSerializerOptions = null)
    {
        this._vectorStore = vectorStore ?? throw new ArgumentNullException(nameof(vectorStore));
        if (serializedStoreState.ValueKind is JsonValueKind.String)
        {
            this.ThreadDbKey = serializedStoreState.Deserialize<string>();
        }
    }

    public string? ThreadDbKey { get; private set; }

    public override async Task AddMessagesAsync(
        IEnumerable<ChatMessage> messages,
        CancellationToken cancellationToken)
    {
        this.ThreadDbKey ??= Guid.NewGuid().ToString("N");
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        await collection.UpsertAsync(messages.Select(x => new ChatHistoryItem()
        {
            Key = this.ThreadDbKey + x.MessageId,
            Timestamp = DateTimeOffset.UtcNow,
            ThreadId = this.ThreadDbKey,
            SerializedMessage = JsonSerializer.Serialize(x),
            MessageText = x.Text
        }), cancellationToken);
    }

    public override async Task<IEnumerable<ChatMessage>> GetMessagesAsync(
        CancellationToken cancellationToken)
    {
        var collection = this._vectorStore.GetCollection<string, ChatHistoryItem>("ChatHistory");
        await collection.EnsureCollectionExistsAsync(cancellationToken);
        var records = collection
            .GetAsync(
                x => x.ThreadId == this.ThreadDbKey, 10,
                new() { OrderBy = x => x.Descending(y => y.Timestamp) },
                cancellationToken);

        List<ChatMessage> messages = [];
        await foreach (var record in records)
        {
            messages.Add(JsonSerializer.Deserialize<ChatMessage>(record.SerializedMessage!)!);
        }

        messages.Reverse();
        return messages;
    }

    public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null) =>
        // We have to serialize the thread id, so that on deserialization you can retrieve the messages using the same thread id.
        JsonSerializer.SerializeToElement(this.ThreadDbKey);

    private sealed class ChatHistoryItem
    {
        [VectorStoreKey]
        public string? Key { get; set; }
        [VectorStoreData]
        public string? ThreadId { get; set; }
        [VectorStoreData]
        public DateTimeOffset? Timestamp { get; set; }
        [VectorStoreData]
        public string? SerializedMessage { get; set; }
        [VectorStoreData]
        public string? MessageText { get; set; }
    }
}

ChatClientAgent와 함께 사용자 지정 ChatMessageStore 사용

사용자 지정 ChatMessageStore을 사용하려면 에이전트를 ChatMessageStoreFactory 만들 때 제공해야 합니다. 이 팩터리를 사용하면 에이전트가 각 스레드에 대해 원하는 ChatMessageStore 새 인스턴스를 만들 수 있습니다.

ChatClientAgent를 만들 때 다른 모든 에이전트 옵션에 추가로 ChatClientAgentOptions 개체를 ChatMessageStoreFactory 제공할 수 있습니다.

using Azure.AI.OpenAI;
using Azure.Identity;
using OpenAI;

AIAgent agent = new AzureOpenAIClient(
    new Uri("https://<myresource>.openai.azure.com"),
    new AzureCliCredential())
     .GetChatClient("gpt-4o-mini")
     .CreateAIAgent(new ChatClientAgentOptions
     {
         Name = "Joker",
         Instructions = "You are good at telling jokes.",
         ChatMessageStoreFactory = ctx =>
         {
             // Create a new chat message store for this agent that stores the messages in a vector store.
             return new VectorChatMessageStore(
                new InMemoryVectorStore(),
                ctx.SerializedState,
                ctx.JsonSerializerOptions);
         }
     });

이 자습서에서는 사용자 지정 ChatMessageStore 을 구현하고 함께 사용하여 ChatAgent에이전트 채팅 기록을 외부 스토리지에 저장하는 방법을 보여 줍니다.

기본적으로 ChatAgent를 사용하는 경우, 서비스에서 지원하면 채팅 기록은 AgentThread 개체의 메모리 또는 기본 유추 서비스에 저장됩니다.

서비스에서 채팅 기록을 서비스에 저장할 필요가 없거나 사용할 수 없는 경우 기본 메모리 내 동작에 의존하는 대신 채팅 기록을 유지하기 위한 사용자 지정 저장소를 제공할 수 있습니다.

필수 조건

필수 구성 요소는 이 자습서의 간단한 에이전트 만들기 및 실행을 참조하세요.

사용자 지정 ChatMessage 스토어 만들기

사용자 지정 ChatMessageStore을 만들려면 프로토콜을 ChatMessageStore 구현하고 필요한 메서드에 대한 구현을 제공해야 합니다.

메시지 스토리지 및 검색 방법

구현할 가장 중요한 방법은 다음과 같습니다.

  • add_messages - 저장소에 새 메시지를 추가하기 위해 호출됩니다.
  • list_messages - 저장소에서 메시지를 검색하기 위해 호출됩니다.

list_messages 는 메시지를 오름차순으로 반환해야 합니다. 반환된 모든 메시지는 기본 채팅 클라이언트를 ChatAgent 호출할 때 사용됩니다. 따라서 이 메서드는 기본 모델의 제한을 고려하고 모델에서 처리할 수 있는 만큼의 메시지만 반환하는 것이 중요합니다.

요약 또는 트리밍과 같은 모든 채팅 기록 줄이기 논리는 메시지를 list_messages반환하기 전에 수행해야 합니다.

직렬화

ChatMessageStore 인스턴스는 스레드가 생성될 때와 스레드가 직렬화된 상태에서 다시 시작될 때 AgentThread에 생성되고 연결됩니다.

채팅 기록을 구성하는 실제 메시지는 외부에 저장되지만 인스턴스 ChatMessageStore 는 외부 저장소에서 채팅 기록을 식별하기 위해 키 또는 기타 상태를 저장해야 할 수 있습니다.

스레드를 지속시키려면 serialize_state 프로토콜의 deserialize_state 메서드와 ChatMessageStore 메서드를 구현해야 합니다. 이러한 메서드를 사용하면 스레드를 다시 열 때 저장소의 상태를 유지 및 복원할 수 있습니다.

샘플 ChatMessageStore 구현

다음 샘플 구현에서는 Redis 목록 데이터 구조를 사용하여 Redis에 채팅 메시지를 저장합니다.

에서 add_messagesRPUSH를 사용하여 Redis에 메시지를 저장하여 목록 끝에 시간순으로 추가합니다.

list_messages 는 LRANGE를 사용하여 Redis에서 현재 스레드에 대한 메시지를 검색하고 오름차순으로 반환합니다.

첫 번째 메시지가 수신되면 저장소는 스레드에 대한 고유 키를 생성한 다음, 후속 호출에 대한 Redis의 채팅 기록을 식별하는 데 사용됩니다.

고유 키 및 기타 구성은 저장되며, 및 serialize_state 메서드를 사용하여 deserialize_state 직렬화 및 역직렬화할 수 있습니다. 따라서 이 상태는 상태의 AgentThread 일부로 유지되므로 나중에 스레드를 다시 시작하고 동일한 채팅 기록을 계속 사용할 수 있습니다.

from collections.abc import Sequence
from typing import Any
from uuid import uuid4
from pydantic import BaseModel
import json
import redis.asyncio as redis
from agent_framework import ChatMessage


class RedisStoreState(BaseModel):
    """State model for serializing and deserializing Redis chat message store data."""

    thread_id: str
    redis_url: str | None = None
    key_prefix: str = "chat_messages"
    max_messages: int | None = None


class RedisChatMessageStore:
    """Redis-backed implementation of ChatMessageStore using Redis Lists."""

    def __init__(
        self,
        redis_url: str | None = None,
        thread_id: str | None = None,
        key_prefix: str = "chat_messages",
        max_messages: int | None = None,
    ) -> None:
        """Initialize the Redis chat message store.

        Args:
            redis_url: Redis connection URL (for example, "redis://localhost:6379").
            thread_id: Unique identifier for this conversation thread.
                      If not provided, a UUID will be auto-generated.
            key_prefix: Prefix for Redis keys to namespace different applications.
            max_messages: Maximum number of messages to retain in Redis.
                         When exceeded, oldest messages are automatically trimmed.
        """
        if redis_url is None:
            raise ValueError("redis_url is required for Redis connection")

        self.redis_url = redis_url
        self.thread_id = thread_id or f"thread_{uuid4()}"
        self.key_prefix = key_prefix
        self.max_messages = max_messages

        # Initialize Redis client
        self._redis_client = redis.from_url(redis_url, decode_responses=True)

    @property
    def redis_key(self) -> str:
        """Get the Redis key for this thread's messages."""
        return f"{self.key_prefix}:{self.thread_id}"

    async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
        """Add messages to the Redis store.

        Args:
            messages: Sequence of ChatMessage objects to add to the store.
        """
        if not messages:
            return

        # Serialize messages and add to Redis list
        serialized_messages = [self._serialize_message(msg) for msg in messages]
        await self._redis_client.rpush(self.redis_key, *serialized_messages)

        # Apply message limit if configured
        if self.max_messages is not None:
            current_count = await self._redis_client.llen(self.redis_key)
            if current_count > self.max_messages:
                # Keep only the most recent max_messages using LTRIM
                await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1)

    async def list_messages(self) -> list[ChatMessage]:
        """Get all messages from the store in chronological order.

        Returns:
            List of ChatMessage objects in chronological order (oldest first).
        """
        # Retrieve all messages from Redis list (oldest to newest)
        redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1)

        messages = []
        for serialized_message in redis_messages:
            message = self._deserialize_message(serialized_message)
            messages.append(message)

        return messages

    async def serialize_state(self, **kwargs: Any) -> Any:
        """Serialize the current store state for persistence.

        Returns:
            Dictionary containing serialized store configuration.
        """
        state = RedisStoreState(
            thread_id=self.thread_id,
            redis_url=self.redis_url,
            key_prefix=self.key_prefix,
            max_messages=self.max_messages,
        )
        return state.model_dump(**kwargs)

    async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
        """Deserialize state data into this store instance.

        Args:
            serialized_store_state: Previously serialized state data.
            **kwargs: Additional arguments for deserialization.
        """
        if serialized_store_state:
            state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
            self.thread_id = state.thread_id
            self.key_prefix = state.key_prefix
            self.max_messages = state.max_messages

            # Recreate Redis client if the URL changed
            if state.redis_url and state.redis_url != self.redis_url:
                self.redis_url = state.redis_url
                self._redis_client = redis.from_url(self.redis_url, decode_responses=True)

    def _serialize_message(self, message: ChatMessage) -> str:
        """Serialize a ChatMessage to JSON string."""
        message_dict = message.model_dump()
        return json.dumps(message_dict, separators=(",", ":"))

    def _deserialize_message(self, serialized_message: str) -> ChatMessage:
        """Deserialize a JSON string to ChatMessage."""
        message_dict = json.loads(serialized_message)
        return ChatMessage.model_validate(message_dict)

    async def clear(self) -> None:
        """Remove all messages from the store."""
        await self._redis_client.delete(self.redis_key)

    async def aclose(self) -> None:
        """Close the Redis connection."""
        await self._redis_client.aclose()

ChatAgent와 함께 사용자 지정 ChatMessageStore 사용

사용자 지정 ChatMessageStore을 사용하려면 에이전트를 chat_message_store_factory 만들 때 제공해야 합니다. 이 팩터리를 사용하면 에이전트가 각 스레드에 대해 원하는 ChatMessageStore 새 인스턴스를 만들 수 있습니다.

ChatAgent을(를) 만들 때 다른 모든 에이전트 옵션과 함께 chat_message_store_factory 매개 변수를 제공할 수 있습니다.

from azure.identity import AzureCliCredential
from agent_framework import ChatAgent
from agent_framework.openai import AzureOpenAIChatClient

# Create the chat agent with custom message store factory
agent = ChatAgent(
    chat_client=AzureOpenAIChatClient(
        endpoint="https://<myresource>.openai.azure.com",
        credential=AzureCliCredential(),
        ai_model_id="gpt-4o-mini"
    ),
    name="Joker",
    instructions="You are good at telling jokes.",
    chat_message_store_factory=lambda: RedisChatMessageStore(
        redis_url="redis://localhost:6379"
    )
)

# Use the agent with persistent chat history
thread = agent.get_new_thread()
response = await agent.run("Tell me a joke about pirates", thread=thread)
print(response.text)

다음 단계