Partager via


Créer un chatbot RAG avec l’API NoSQL Azure Cosmos DB

Dans ce guide, nous démontrons comment créer une application Modèle RAG à l’aide d’un sous-ensemble du jeu de données Movie Lens. Cet exemple tire parti du SDK Python pour Azure Cosmos DB pour NoSQL afin d’effectuer une recherche vectorielle pour RAG, de stocker et de récupérer l’historique des conversations et de stocker les vecteurs de l’historique des conversations à utiliser comme cache sémantique. Azure OpenAI est utilisé pour générer des incorporations et des achèvements LLM (Large Language Model).

À la fin, nous allons créer une expérience utilisateur simple à l’aide de Gradio pour permettre aux utilisateurs de taper des questions et d’afficher les réponses générées par Azure OpenAI ou servies à partir du cache. Les réponses affichent également un temps écoulé pour montrer l’impact de la mise en cache sur les performances par rapport à la génération d’une réponse.

Conseil

Vous trouverez l’exemple complet du notebook Python ici.

Pour plus d’exemples RAG, consultez : AzureDataRetrievalAugmentedGenerationSamples

Important

Cet exemple vous oblige à configurer des comptes pour Azure Cosmos DB pour NoSQL et Azure OpenAI. Pour commencer, consultez :

1. Installer les packages nécessaires

Installez les packages Python nécessaires pour interagir avec Azure Cosmos DB et d’autres services.

! pip install --user python-dotenv
! pip install --user aiohttp
! pip install --user openai
! pip install --user gradio
! pip install --user ijson
! pip install --user nest_asyncio
! pip install --user tenacity
# Note: ensure you have azure-cosmos version 4.7 or higher installed
! pip install --user azure-cosmos

2. Initialiser votre connexion cliente

Remplissez sample_env_file.env avec les informations d’identification appropriées pour Azure Cosmos DB et Azure OpenAI.

cosmos_uri = "https://<replace with cosmos db account name>.documents.azure.com:443/"
cosmos_key = "<replace with cosmos db account key>"
cosmos_database_name = "database"
cosmos_collection_name = "vectorstore"
cosmos_vector_property_name = "vector"
cosmos_cache_database_name = "database"
cosmos_cache_collection_name = "vectorcache"
openai_endpoint = "<replace with azure openai endpoint>"
openai_key = "<replace with azure openai key>"
openai_type = "azure"
openai_api_version = "2023-05-15"
openai_embeddings_deployment = "<replace with azure openai embeddings deployment name>"
openai_embeddings_model = "<replace with azure openai embeddings model - e.g. text-embedding-3-large"
openai_embeddings_dimensions = "1536"
openai_completions_deployment = "<replace with azure openai completions deployment name>"
openai_completions_model = "<replace with azure openai completions model - e.g. gpt-35-turbo>"
storage_file_url = "https://cosmosdbcosmicworks.blob.core.windows.net/fabcondata/movielens_dataset.json"
# Import the required libraries
import time
import json
import uuid
import urllib 
import ijson
import zipfile
from dotenv import dotenv_values
from openai import AzureOpenAI
from azure.core.exceptions import AzureError
from azure.cosmos import PartitionKey, exceptions
from time import sleep
import gradio as gr

# Cosmos DB imports
from azure.cosmos import CosmosClient

# Load configuration
env_name = "sample_env_file.env"
config = dotenv_values(env_name)

cosmos_conn = config['cosmos_uri']
cosmos_key = config['cosmos_key']
cosmos_database = config['cosmos_database_name']
cosmos_collection = config['cosmos_collection_name']
cosmos_vector_property = config['cosmos_vector_property_name']
comsos_cache_db = config['cosmos_cache_database_name']
cosmos_cache = config['cosmos_cache_collection_name']

# Create the Azure Cosmos DB for NoSQL async client for faster data loading
cosmos_client = CosmosClient(url=cosmos_conn, credential=cosmos_key)

openai_endpoint = config['openai_endpoint']
openai_key = config['openai_key']
openai_api_version = config['openai_api_version']
openai_embeddings_deployment = config['openai_embeddings_deployment']
openai_embeddings_dimensions = int(config['openai_embeddings_dimensions'])
openai_completions_deployment = config['openai_completions_deployment']

# Movies file url
storage_file_url = config['storage_file_url']

# Create the OpenAI client
openai_client = AzureOpenAI(azure_endpoint=openai_endpoint, api_key=openai_key, api_version=openai_api_version)

3. Créer une base de données et des conteneurs avec des stratégies vectorielles

Cette fonction prend un objet de base de données, un nom de collection, le nom de la propriété de document qui stocke des vecteurs et le nombre de dimensions vectorielles utilisées pour les incorporations.

db = cosmos_client.create_database_if_not_exists(cosmos_database)

# Create the vector embedding policy to specify vector details
vector_embedding_policy = {
    "vectorEmbeddings": [ 
        { 
            "path":"/" + cosmos_vector_property,
            "dataType":"float32",
            "distanceFunction":"cosine",
            "dimensions":openai_embeddings_dimensions
        }, 
    ]
}

# Create the vector index policy to specify vector details
indexing_policy = {
    "includedPaths": [ 
    { 
        "path": "/*" 
    } 
    ], 
    "excludedPaths": [ 
    { 
        "path": "/\"_etag\"/?",
        "path": "/" + cosmos_vector_property + "/*",
    } 
    ], 
    "vectorIndexes": [ 
        {
            "path": "/"+cosmos_vector_property, 
            "type": "quantizedFlat" 
        }
    ]
} 

# Create the data collection with vector index (note: this creates a container with 10000 RUs to allow fast data load)
try:
    movies_container = db.create_container_if_not_exists(id=cosmos_collection, 
                                                  partition_key=PartitionKey(path='/id'),
                                                  indexing_policy=indexing_policy, 
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=10000) 
    print('Container with id \'{0}\' created'.format(movies_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise 

# Create the cache collection with vector index
try:
    cache_container = db.create_container_if_not_exists(id=cosmos_cache, 
                                                  partition_key=PartitionKey(path='/id'), 
                                                  indexing_policy=indexing_policy,
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=1000) 
    print('Container with id \'{0}\' created'.format(cache_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise

4. Générer des incorporations depuis Azure OpenAI

Cette fonction vectorise l’entrée utilisateur pour la recherche vectorielle. Vérifiez que la dimensionnalité et le modèle utilisés correspondent aux exemples de données fournies, ou régénérez les vecteurs avec le modèle souhaité.

from tenacity import retry, stop_after_attempt, wait_random_exponential 
import logging
@retry(wait=wait_random_exponential(min=2, max=300), stop=stop_after_attempt(20))
def generate_embeddings(text):
    try:        
        response = openai_client.embeddings.create(
            input=text,
            model=openai_embeddings_deployment,
            dimensions=openai_embeddings_dimensions
        )
        embeddings = response.model_dump()
        return embeddings['data'][0]['embedding']
    except Exception as e:
        # Log the exception with traceback for easier debugging
        logging.error("An error occurred while generating embeddings.", exc_info=True)
        raise

5. Charger des données à partir du fichier JSON

Extrayez le jeu de données MovieLens pré-vectorisé à partir du fichier zip (voir son emplacement dans le référentiel de notebooks ici).

# Unzip the data file
with zipfile.ZipFile("../../DataSet/Movies/MovieLens-4489-256D.zip", 'r') as zip_ref:
    zip_ref.extractall("/Data")
zip_ref.close()

# Load the data file
data = []
with open('/Data/MovieLens-4489-256D.json', 'r') as d:
    data = json.load(d)

# View the number of documents in the data (4489)
len(data)

6. Stocker les données dans Azure Cosmos DB

Effectuez un upsert des données dans Azure Cosmos DB for NoSQL. Les enregistrements sont écrits de manière asynchrone.

#The following code to get raw movies data is commented out in favour of
#getting prevectorized data. If you want to vectorize the raw data from
#storage_file_url, uncomment the below, and set vectorizeFlag=True

#data = urllib.request.urlopen(storage_file_url)
#data = json.load(data)

vectorizeFlag=False

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def generate_vectors(items, vector_property):
    # Create a thread pool executor for the synchronous generate_embeddings
    loop = asyncio.get_event_loop()
    
    # Define a function to call generate_embeddings using run_in_executor
    async def generate_embedding_for_item(item):
        try:
            # Offload the sync generate_embeddings to a thread
            vectorArray = await loop.run_in_executor(None, generate_embeddings, item['overview'])
            item[vector_property] = vectorArray
        except Exception as e:
            # Log or handle exceptions if needed
            logging.error(f"Error generating embedding for item: {item['overview'][:50]}...", exc_info=True)
    
    # Create tasks for all the items to generate embeddings concurrently
    tasks = [generate_embedding_for_item(item) for item in items]
    
    # Run all the tasks concurrently and wait for their completion
    await asyncio.gather(*tasks)
    
    return items

async def insert_data(vectorize=False):
    start_time = time.time()  # Record the start time
    
    # If vectorize flag is True, generate vectors for the data
    if vectorize:
        print("Vectorizing data, please wait...")
        global data
        data = await generate_vectors(data, "vector")

    counter = 0
    tasks = []
    max_concurrency = 5  # Adjust this value to control the level of concurrency
    semaphore = asyncio.Semaphore(max_concurrency)
    print("Starting doc load, please wait...")
    
    def upsert_item_sync(obj):
        movies_container.upsert_item(body=obj)
    
    async def upsert_object(obj):
        nonlocal counter
        async with semaphore:
            await asyncio.get_event_loop().run_in_executor(None, upsert_item_sync, obj)
            # Progress reporting
            counter += 1
            if counter % 100 == 0:
                print(f"Sent {counter} documents for insertion into collection.")
    
    for obj in data:
        tasks.append(asyncio.create_task(upsert_object(obj)))
    
    # Run all upsert tasks concurrently within the limits set by the semaphore
    await asyncio.gather(*tasks)
    
    end_time = time.time()  # Record the end time
    duration = end_time - start_time  # Calculate the duration
    print(f"All {counter} documents inserted!")
    print(f"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)")

# Run the async function with the vectorize flag set to True or False as needed
await insert_data(vectorizeFlag)  # or await insert_data() for default

Cette fonction définit une recherche vectorielle sur les données de films et les collections de cache des conversations.

def vector_search(container, vectors, similarity_score=0.02, num_results=5):
    results = container.query_items(
        query='''
        SELECT TOP @num_results c.overview, VectorDistance(c.vector, @embedding) as SimilarityScore 
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score}
        ],
        enable_cross_partition_query=True,
        populate_query_metrics=True
    )
    results = list(results)
    formatted_results = [{'SimilarityScore': result.pop('SimilarityScore'), 'document': result} for result in results]

    return formatted_results

8. Obtenir l’historique des conversations récentes

Cette fonction fournit un contexte conversationnel au LLM, ce qui lui permet de mieux converser avec l’utilisateur.

def get_chat_history(container, completions=3):
    results = container.query_items(
        query='''
        SELECT TOP @completions *
        FROM c
        ORDER BY c._ts DESC
        ''',
        parameters=[
            {"name": "@completions", "value": completions},
        ],
        enable_cross_partition_query=True
    )
    results = list(results)
    return results

9. Fonctions d’achèvement de conversation

Définissez les fonctions pour gérer le processus d’achèvement de conversation, y compris la mise en cache des réponses.

def generate_completion(user_prompt, vector_search_results, chat_history):
    system_prompt = '''
    You are an intelligent assistant for movies. You are designed to provide helpful answers to user questions about movies in your database.
    You are friendly, helpful, and informative and can be lighthearted. Be concise in your responses, but still friendly.
     - Only answer questions related to the information provided below. Provide at least 3 candidate movie answers in a list.
     - Write two lines of whitespace between each answer in the list.
    '''

    messages = [{'role': 'system', 'content': system_prompt}]
    for chat in chat_history:
        messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
    messages.append({'role': 'user', 'content': user_prompt})
    for result in vector_search_results:
        messages.append({'role': 'system', 'content': json.dumps(result['document'])})

    response = openai_client.chat.completions.create(
        model=openai_completions_deployment,
        messages=messages,
        temperature=0.1
    )    
    return response.model_dump()

def chat_completion(cache_container, movies_container, user_input):
    print("starting completion")
    # Generate embeddings from the user input
    user_embeddings = generate_embeddings(user_input)
    # Query the chat history cache first to see if this question has been asked before
    cache_results = get_cache(container=cache_container, vectors=user_embeddings, similarity_score=0.99, num_results=1)
    if len(cache_results) > 0:
        print("Cached Result\n")
        return cache_results[0]['completion'], True
        
    else:
        # Perform vector search on the movie collection
        print("New result\n")
        search_results = vector_search(movies_container, user_embeddings)

        print("Getting Chat History\n")
        # Chat history
        chat_history = get_chat_history(cache_container, 3)
        # Generate the completion
        print("Generating completions \n")
        completions_results = generate_completion(user_input, search_results, chat_history)

        print("Caching response \n")
        # Cache the response
        cache_response(cache_container, user_input, user_embeddings, completions_results)

        print("\n")
        # Return the generated LLM completion
        return completions_results['choices'][0]['message']['content'], False

10. Réponses générées par le cache

Enregistrez les invites de l’utilisateur et les achèvements générés dans le cache pour accélérer les réponses futures.

def cache_response(container, user_prompt, prompt_vectors, response):
    chat_document = {
        'id': str(uuid.uuid4()),
        'prompt': user_prompt,
        'completion': response['choices'][0]['message']['content'],
        'completionTokens': str(response['usage']['completion_tokens']),
        'promptTokens': str(response['usage']['prompt_tokens']),
        'totalTokens': str(response['usage']['total_tokens']),
        'model': response['model'],
        'vector': prompt_vectors
    }
    container.create_item(body=chat_document)

def get_cache(container, vectors, similarity_score=0.0, num_results=5):
    # Execute the query
    results = container.query_items(
        query= '''
        SELECT TOP @num_results *
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score},
        ],
        enable_cross_partition_query=True, populate_query_metrics=True)
    results = list(results)
    return results

11. Créer une expérience utilisateur simple dans Gradio

Créez une interface utilisateur à l’aide de Gradio pour interagir avec l’application IA.

chat_history = []

with gr.Blocks() as demo:
    chatbot = gr.Chatbot(label="Cosmic Movie Assistant")
    msg = gr.Textbox(label="Ask me about movies in the Cosmic Movie Database!")
    clear = gr.Button("Clear")

    def user(user_message, chat_history):
        start_time = time.time()
        response_payload, cached = chat_completion(cache_container, movies_container, user_message)
        end_time = time.time()
        elapsed_time = round((end
        time - start_time) * 1000, 2)
        details = f"\n (Time: {elapsed_time}ms)"
        if cached:
            details += " (Cached)"
        chat_history.append([user_message, response_payload + details])
        
        return gr.update(value=""), chat_history
    
    msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False)
    clear.click(lambda: None, None, chatbot, queue=False)

# Launch the Gradio interface
demo.launch(debug=True)

# Be sure to run this cell to close or restart the Gradio demo
demo.close()

Solutions de base de données vectorielles

Extension pgvector du serveur Azure PostgreSQL

Étape suivante