Aracılığıyla paylaş


AI ajanı belleği (Model Hizmet Sunumu)

Önemli

Bu özellik Genel Önizleme aşamasındadır.

Önemli

Yeni kullanım örnekleri için Databricks aracı kodu, sunucu yapılandırması ve dağıtım iş akışı üzerinde tam denetim için Databricks Uygulamalarına aracı dağıtmanızı önerir. Bkz. Bir yapay zeka aracısı yazma ve Databricks Uygulamalarında dağıtma.

Bellek, yapay zeka aracılarının konuşmanın önceki bölümlerindeki veya önceki konuşmalardaki bilgileri hatırlamasına olanak tanır. Bu, aracıların bağlama duyarlı yanıtlar sağlamasına ve zaman içinde kişiselleştirilmiş deneyimler oluşturmasına olanak tanır. Konuşma durumunu ve geçmişini yönetmek için tam olarak yönetilen bir Postgres OLTP veritabanı olan Databricks Lakebase'i kullanın.

Gereksinimler

Kısa vadeli ve uzun süreli bellek karşılaştırması

Kısa süreli bellek tek bir konuşma oturumunda bağlamı yakalarken, uzun süreli bellek birden çok konuşmada önemli bilgileri ayıklar ve depolar. Yazılım temsilcinizi ya bellek türlerinden biriyle ya da her ikisiyle de oluşturabilirsiniz.

Kısa ve uzun süreli belleğe sahip aracılar

Kısa süreli bellek Uzun süreli bellek
Tek bir konuşma oturumunda bağlamı yakalamak için iş parçacığı ID'leri ve kontrol noktalarını kullanma
Oturumdaki izleme soruları için bağlamı koruma
Zaman yolculuğu kullanarak konuşma akışlarında hata ayıklama ve test
Birden çok oturumda önemli içgörüleri otomatik olarak ayıklama ve depolama
Geçmiş tercihlere göre etkileşimleri kişiselleştirme
Kullanıcılar hakkında zaman içinde yanıtları geliştiren bir bilgi bankası oluşturma

Not defteri örnekleri

Kısa süreli belleğe sahip ajan

Dizüstü bilgisayar al

Uzun süreli belleğe sahip ajan

Dizüstü bilgisayar al

Dağıtılan aracınızı sorgula

Aracınızı model sunma uç noktasına dağıttıktan sonra, sorgu yönergeleri için bkz. Azure Databricks'te dağıtılan aracıyı sorgulama.

Bir iş parçacığı kimliği geçirmek için extra_body parametresini kullanın. Aşağıdaki örnekte, bir iş parçacığı ID'sini ResponsesAgent uç noktaya nasıl geçireceğiniz gösterilmektedir:

   response1 = client.responses.create(
    model=endpoint,
    input=[{"role": "user", "content": "What are stateful agents?"}],
    extra_body={
        "custom_inputs": {"thread_id": thread_id}
    }
)

Playground veya Review uygulaması gibi ChatContext'i otomatik olarak geçiren bir istemci kullanıyorsanız, kısa süreli/uzun süreli bellek kullanım örnekleri için konuşma kimliği ve kullanıcı kimliği otomatik olarak geçirilir.

Kısa süreli bellek zaman yolculuğu

Kısa süreli belleği olan aracılar için denetim noktalarından yürütmeyi sürdürmek için LangGraph zaman yolculuğu kullanın. Konuşmayı yeniden yürütebilir veya alternatif yolları keşfetmek için değiştirebilirsiniz. Bir kontrol noktasından her devam ettiğinizde, LangGraph konuşma geçmişinde yeni bir çatal oluşturur ve denemeyi etkinleştirirken orijinali korur.

  1. Aracı kodunda, LangGraphResponsesAgent sınıfında denetim noktası geçmişini alan ve denetim noktası durumunu güncelleştiren işlevler oluşturun.

    from typing import List, Dict
    def get_checkpoint_history(self, thread_id: str, limit: int = 10) -> List[Dict[str, Any]]:
        """Retrieve checkpoint history for a thread.
    
        Args:
            thread_id: The thread identifier
            limit: Maximum number of checkpoints to return
    
        Returns:
            List of checkpoint information including checkpoint_id, timestamp, and next nodes
        """
        config = {"configurable": {"thread_id": thread_id}}
    
       with CheckpointSaver(instance_name=LAKEBASE_INSTANCE_NAME) as checkpointer:
            graph = self._create_graph(checkpointer)
    
            history = []
            for state in graph.get_state_history(config):
                if len(history) >= limit:
                    break
    
                history.append({
                    "checkpoint_id": state.config["configurable"]["checkpoint_id"],
                    "thread_id": thread_id,
                    "timestamp": state.created_at,
                    "next_nodes": state.next,
                    "message_count": len(state.values.get("messages", [])),
                    # Include last message summary for context
                    "last_message": self._get_last_message_summary(state.values.get("messages", []))
                })
    
            return history
    
    def _get_last_message_summary(self, messages: List[Any]) -> Optional[str]:
        """Get a snippet of the last message for checkpoint identification"""
        return getattr(messages[-1], "content", "")[:100] if messages else None
    
    def update_checkpoint_state(self, thread_id: str, checkpoint_id: str,
                            new_messages: Optional[List[Dict]] = None) -> Dict[str, Any]:
        """Update state at a specific checkpoint (used for modifying conversation history).
    
        Args:
            thread_id: The thread identifier
            checkpoint_id: The checkpoint to update
            new_messages: Optional new messages to set at this checkpoint
    
        Returns:
            New checkpoint configuration including the new checkpoint_id
        """
        config = {
            "configurable": {
                "thread_id": thread_id,
                "checkpoint_id": checkpoint_id
            }
        }
    
        with CheckpointSaver(instance_name=LAKEBASE_INSTANCE_NAME) as checkpointer:
            graph = self._create_graph(checkpointer)
    
            # Prepare the values to update
            values = {}
            if new_messages:
                cc_msgs = self.prep_msgs_for_cc_llm(new_messages)
                values["messages"] = cc_msgs
    
            # Update the state (creates a new checkpoint)
            new_config = graph.update_state(config, values=values)
    
            return {
                "thread_id": thread_id,
                "checkpoint_id": new_config["configurable"]["checkpoint_id"],
                "parent_checkpoint_id": checkpoint_id
            }
    
  2. predict ve predict_stream işlevlerini, denetim noktalarının geçirilmesini destekleyecek şekilde güncelleştirin.

    Predict

    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """Non-streaming prediction"""
        # The same thread_id is used by BOTH predict() and predict_stream()
        ci = dict(request.custom_inputs or {})
        if "thread_id" not in ci:
            ci["thread_id"] = str(uuid.uuid4())
        request.custom_inputs = ci
    
        outputs = [
            event.item
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]
    
        # Include thread_id and checkpoint_id in custom outputs
        custom_outputs = {
            "thread_id": ci["thread_id"]
        }
        if "checkpoint_id" in ci:
            custom_outputs["parent_checkpoint_id"] = ci["checkpoint_id"]
    
        try:
            history = self.get_checkpoint_history(ci["thread_id"], limit=1)
            if history:
                custom_outputs["checkpoint_id"] = history[0]["checkpoint_id"]
        except Exception as e:
            logger.warning(f"Could not retrieve new checkpoint_id: {e}")
    
        return ResponsesAgentResponse(output=outputs, custom_outputs=custom_outputs)
    

    Predict_stream

    def predict_stream(
        self,
        request: ResponsesAgentRequest,
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """Streaming prediction with PostgreSQL checkpoint branching support.
    
        Accepts in custom_inputs:
        - thread_id: Conversation thread identifier for session
        - checkpoint_id (optional): Checkpoint to resume from (for branching)
        """
        # Get thread ID and checkpoint ID from custom inputs
        custom_inputs = request.custom_inputs or {}
        thread_id = custom_inputs.get("thread_id", str(uuid.uuid4()))  # generate new thread ID if one is not passed in
        checkpoint_id = custom_inputs.get("checkpoint_id")  # Optional for branching
    
        # Convert incoming Responses messages to LangChain format
        langchain_msgs = self.prep_msgs_for_cc_llm([i.model_dump() for i in request.input])
    
        # Build checkpoint configuration
        checkpoint_config = {"configurable": {"thread_id": thread_id}}
        # If checkpoint_id is provided, we're branching from that checkpoint
        if checkpoint_id:
            checkpoint_config["configurable"]["checkpoint_id"] = checkpoint_id
            logger.info(f"Branching from checkpoint: {checkpoint_id} in thread: {thread_id}")
    
        # DATABASE CONNECTION POOLING LOGIC FOLLOWS
        # Use connection from pool
    

Ardından kontrol noktası dallanmanızı test edin:

  1. Konuşma yazışması başlatın ve birkaç ileti ekleyin:

    from agent import AGENT
    # Initial conversation - starts a new thread
    response1 = AGENT.predict({
        "input": [{"role": "user", "content": "I'm planning for an upcoming trip!"}],
    })
    print(response1.model_dump(exclude_none=True))
    thread_id = response1.custom_outputs["thread_id"]
    
    # Within the same thread, ask a follow-up question - short-term memory will remember previous messages in the same thread/conversation session
    response2 = AGENT.predict({
        "input": [{"role": "user", "content": "I'm headed to SF!"}],
        "custom_inputs": {"thread_id": thread_id}
    })
    print(response2.model_dump(exclude_none=True))
    
    # Within the same thread, ask a follow-up question - short-term memory will remember previous messages in the same thread/conversation session
    response3 = AGENT.predict({
        "input": [{"role": "user", "content": "Where did I say I'm going?"}],
        "custom_inputs": {"thread_id": thread_id}
    })
    print(response3.model_dump(exclude_none=True))
    
    
  2. Denetim noktası geçmişini alın ve konuşmayı yeni bir mesaja dönüştürerek başka bir yöne yönlendirin:

    # Get checkpoint history to find branching point
    history = AGENT.get_checkpoint_history(thread_id, 20)
    # Retrieve checkpoint at index - indices count backward from most recent checkpoint
    index = max(1, len(history) - 4)
    branch_checkpoint = history[index]["checkpoint_id"]
    
    # Branch from node with next_node = `('__start__',)` to re-input message to agent at certain part of conversation
    # I want to update the information of which city I am going to
    # Within the same thread, branch from a checkpoint and override it with different context to continue the conversation in a new fork
    response4 = AGENT.predict({
        "input": [{"role": "user", "content": "I'm headed to New York!"}],
        "custom_inputs": {
            "thread_id": thread_id,
            "checkpoint_id": branch_checkpoint # Branch from this checkpoint!
        }
    })
    print(response4.model_dump(exclude_none=True))
    
    # Thread ID stays the same even though it branched from a checkpoint:
    branched_thread_id = response4.custom_outputs["thread_id"]
    print(f"original thread id was {thread_id}")
    print(f"new thread id after branching is the same as original: {branched_thread_id}")
    
    # Continue the conversation in the same thread and it will pick up from the information you tell it in your branch
    response5 = AGENT.predict({
        "input": [{"role": "user", "content": "Where am I going?"}],
        "custom_inputs": {
            "thread_id": thread_id,
        }
    })
    print(response5.model_dump(exclude_none=True))
    

Sonraki Adımlar