Condividi tramite


Esercitazione: Creare un chatbot RAG con l'API NoSQL di Azure Cosmos DB

In questa guida viene illustrato come creare un'applicazione rag (Retrieval-Augmented Generation) usando un subset del set di dati Movie Lens. Questo esempio usa Python SDK per NoSQL di Azure Cosmos DB per eseguire la ricerca vettoriale di RAG, archiviare e recuperare la cronologia delle chat e archiviare i vettori della cronologia delle chat da usare come cache semantica. Azure OpenAI viene usato per generare incorporamenti e completamenti LLM (Large Language Model).

Alla fine viene creata un'interfaccia utente semplice per consentire agli utenti di digitare domande e visualizzare le risposte generate da Azure OpenAI o gestite dalla cache. Le risposte visualizzano anche un tempo trascorso per mostrare che l'effetto della memorizzazione nella cache ha sulle prestazioni rispetto alla generazione di una risposta.

Prerequisiti

  • Account Azure Cosmos DB for NoSQL

  • Account OpenAI di Azure

    • Implementazione di embedding usando un modello come text-embedding-3-large

    • Distribuzione dei completamenti utilizzando un modello come gpt-35-turbo

Ottenere i file dei prerequisiti

  1. Iniziare in una directory vuota.

  2. Passare alla cartella compressa di esempio in GitHub: (https://github.com/microsoft/AzureDataRetrievalAugmentedGenerationSamples/blob/main/DataSet/Movies/MovieLens-4489-256D.zip)

  3. Nel menu selezionare Scarica.

  4. Salvare il file nella cartella del progetto locale usando il nome file MovieLens-4489-256D.zip.

  5. Passare ai dati JSON prevettorizzati di esempio su GitHub: (https://github.com/microsoft/AzureDataRetrievalAugmentedGenerationSamples/blob/main/DataSet/Movies/MovieLens-4489-256D.json)

  6. Nel menu selezionare Scarica.

  7. Salvare il file nella cartella del progetto locale usando il nome file MovieLens-4489-256D.json.

  8. Nella directory del progetto creare una nuova cartella o un percorso denominato Data/.

    Importante

    Per il nome è rilevante la distinzione tra maiuscole e minuscole.

Configurare il progetto e la directory

Per iniziare, configurare il progetto Python con i pacchetti e le variabili di ambiente necessari.

  1. Aprire un terminale nella directory del progetto.

  2. Installare i pacchetti Python necessari usando questi comandi della shell.

    pip install --user python-dotenv
    pip install --user aiohttp
    pip install --user openai
    pip install --user gradio
    pip install --user ijson
    pip install --user nest_asyncio
    pip install --user tenacity
    pip install --user azure-cosmos
    

    Annotazioni

    Assicurarsi di avere azure-cosmos installato la versione 4.7 o successiva.

  3. Creare un file .env.

  4. Immettere le informazioni seguenti nel file. Usare le credenziali e i nomi delle risorse per le risorse Azure OpenAI e Azure Cosmos DB esistenti.

    cosmos_uri = "https://<azure-cosmos-db-nosql-account-name>.documents.azure.com:443/"
    cosmos_key = "<azure-cosmos-db-nosql-account-key>"
    cosmos_database_name = "ragdatabase"
    cosmos_container_name = "vectorstorecontainer"
    cosmos_vector_property_name = "vector"
    cosmos_cache_database_name = "ragdatabase"
    cosmos_cache_container_name = "vectorcachecontainer"
    openai_endpoint = "<azure-openai-account-endpoint>"
    openai_key = "<azure-openai-account-key>"
    openai_type = "azure"
    openai_api_version = "2023-05-15"
    openai_embeddings_deployment = "<azure-openai-embeddings-deployment-name>"
    openai_embeddings_model = "<azure-openai-embeddings-model>"
    openai_embeddings_dimensions = "1536"
    openai_completions_deployment = "<azure-openai-completions-deployment-name>"
    openai_completions_model = "<azure-openai-completions-model>"
    
  5. Creare un file app.py .

Inizializza la connessione del client

Usare questo codice iniziale per creare un'applicazione client e inizializzare una connessione alle risorse in Azure.

# Import the required libraries
import time
import json
import uuid
import urllib 
import ijson
import zipfile
from dotenv import dotenv_values
from openai import AzureOpenAI
from azure.core.exceptions import AzureError
from azure.cosmos import ThroughputProperties, PartitionKey, exceptions
from time import sleep
import gradio as gr

# Cosmos DB imports
from azure.cosmos import CosmosClient

# Load configuration from .env file
config = dotenv_values()

cosmos_conn = config['cosmos_uri']
cosmos_key = config['cosmos_key']
cosmos_database = config['cosmos_database_name']
cosmos_collection = config['cosmos_container_name']
cosmos_vector_property = config['cosmos_vector_property_name']
comsos_cache_db = config['cosmos_cache_database_name']
cosmos_cache = config['cosmos_cache_container_name']

# Create the Azure Cosmos DB for NoSQL async client for faster data loading
cosmos_client = CosmosClient(url=cosmos_conn, credential=cosmos_key)

openai_endpoint = config['openai_endpoint']
openai_key = config['openai_key']
openai_api_version = config['openai_api_version']
openai_embeddings_deployment = config['openai_embeddings_deployment']
openai_embeddings_dimensions = int(config['openai_embeddings_dimensions'])
openai_completions_deployment = config['openai_completions_deployment']

# Create the OpenAI client
openai_client = AzureOpenAI(azure_endpoint=openai_endpoint, api_key=openai_key, api_version=openai_api_version)

Creare un database e contenitori con criteri vettoriali

Questa funzione accetta un oggetto di database, un nome di raccolta, il nome della proprietà del documento che archivia i vettori e il numero di dimensioni vettoriali usate per gli incorporamenti.

db = cosmos_client.create_database_if_not_exists(cosmos_database)

# Create the vector embedding policy to specify vector details
vector_embedding_policy = {
    "vectorEmbeddings": [ 
        { 
            "path":"/" + cosmos_vector_property,
            "dataType":"float32",
            "distanceFunction":"cosine",
            "dimensions":openai_embeddings_dimensions
        }, 
    ]
}

# Create the vector index policy to specify vector details
indexing_policy = {
    "includedPaths": [ 
    { 
        "path": "/*" 
    } 
    ], 
    "excludedPaths": [ 
    { 
        "path": "/\"_etag\"/?",
        "path": "/" + cosmos_vector_property + "/*",
    } 
    ], 
    "vectorIndexes": [ 
        {
            "path": "/"+cosmos_vector_property, 
            "type": "quantizedFlat" 
        }
    ]
} 

# Create the data collection with vector index (note: this creates a container with an autoscale limit of 20,000 RUs to allow fast data load)
try:
    movies_container = db.create_container_if_not_exists(id=cosmos_collection, 
                                                  partition_key=PartitionKey(path='/id'),
                                                  indexing_policy=indexing_policy, 
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=ThroughputProperties(auto_scale_max_throughput=20000)) 
    print('Container with id \'{0}\' created'.format(movies_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise 

# Create the cache collection with vector index
try:
    cache_container = db.create_container_if_not_exists(id=cosmos_cache, 
                                                  partition_key=PartitionKey(path='/id'), 
                                                  indexing_policy=indexing_policy,
                                                  vector_embedding_policy=vector_embedding_policy,
                                                  offer_throughput=ThroughputProperties(auto_scale_max_throughput=2000)) 
    print('Container with id \'{0}\' created'.format(cache_container.id)) 

except exceptions.CosmosHttpResponseError: 
    raise

Generare incorporamenti da Azure OpenAI

Questa funzione esegue l'upsert dell'input dell'utente per la ricerca vettoriale. Verificare che la dimensionalità e il modello usati corrispondano ai dati di esempio forniti oppure rigenerare i vettori con il modello desiderato.

from tenacity import retry, stop_after_attempt, wait_random_exponential 
import logging

@retry(wait=wait_random_exponential(min=2, max=300), stop=stop_after_attempt(20))
def generate_embeddings(text):
    try:        
        response = openai_client.embeddings.create(
            input=text,
            model=openai_embeddings_deployment,
            dimensions=openai_embeddings_dimensions
        )
        embeddings = response.model_dump()
        return embeddings['data'][0]['embedding']
    except Exception as e:
        # Log the exception with traceback for easier debugging
        logging.error("An error occurred while generating embeddings.", exc_info=True)
        raise

Caricare dati dal file JSON

Estrarre il set di dati MovieLens prevectorizzato dalla cartella compressa e dal file di dati JSON.

# Unzip the data file
with zipfile.ZipFile("MovieLens-4489-256D.zip", 'r') as zip_ref:
    zip_ref.extractall("Data")
zip_ref.close()

# Load the data file
data = []
with open('MovieLens-4489-256D.json', 'r') as d:
    data = json.load(d)

# View the number of documents in the data (4489)
len(data)

Archiviare i dati in Azure Cosmos DB

Eseguire l'upsert dei dati in Azure Cosmos DB for NoSQL. I record vengono scritti in modo asincrono.

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor

async def insert_data():
    start_time = time.time()  # Record the start time
    
    counter = 0
    tasks = []
    max_concurrency = 4  # Adjust this value to control the level of concurrency
    semaphore = asyncio.Semaphore(max_concurrency)
    print("Starting doc load, please wait...")
    
    def upsert_item_sync(obj):
        movies_container.upsert_item(body=obj)
    
    async def upsert_object(obj):
        nonlocal counter
        async with semaphore:
            await asyncio.get_event_loop().run_in_executor(None, upsert_item_sync, obj)
            # Progress reporting
            counter += 1
            if counter % 100 == 0:
                print(f"Sent {counter} documents for insertion into collection.")
    
    for obj in data:
        tasks.append(asyncio.create_task(upsert_object(obj)))
    
    # Run all upsert tasks concurrently within the limits set by the semaphore
    await asyncio.gather(*tasks)
    
    end_time = time.time()  # Record the end time
    duration = end_time - start_time  # Calculate the duration
    print(f"All {counter} documents inserted!")
    print(f"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)")


# Run the async function
asyncio.run(insert_data())

Questa funzione definisce una ricerca vettoriale sui dati dei film e sulle raccolte di cache delle chat.

def vector_search(container, vectors, similarity_score=0.02, num_results=5):
    results = container.query_items(
        query='''
        SELECT TOP @num_results c.overview, VectorDistance(c.vector, @embedding) as SimilarityScore 
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score}
        ],
        enable_cross_partition_query=True,
        populate_query_metrics=True
    )
    results = list(results)
    formatted_results = [{'SimilarityScore': result.pop('SimilarityScore'), 'document': result} for result in results]

    return formatted_results

Ottenere la cronologia delle chat recenti

Questa funzione fornisce il contesto di conversazione all'LLM, consentendo di avere una conversazione migliore con l'utente.

def get_chat_history(container, completions=3):
    results = container.query_items(
        query='''
        SELECT TOP @completions *
        FROM c
        ORDER BY c._ts DESC
        ''',
        parameters=[
            {"name": "@completions", "value": completions},
        ],
        enable_cross_partition_query=True
    )
    results = list(results)
    return results

Funzioni di completamento della chat

Definire le funzioni per gestire il processo di completamento della chat, includendo la memorizzazione delle risposte nella cache.

def generate_completion(user_prompt, vector_search_results, chat_history):
    system_prompt = '''
    You are an intelligent assistant for movies. You are designed to provide helpful answers to user questions about movies in your database.
    You are friendly, helpful, and informative and can be lighthearted. Be concise in your responses, but still friendly.
     - Only answer questions related to the information provided below. Provide at least 3 candidate movie answers in a list.
     - Write two lines of whitespace between each answer in the list.
    '''

    messages = [{'role': 'system', 'content': system_prompt}]
    for chat in chat_history:
        messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
    messages.append({'role': 'user', 'content': user_prompt})
    for result in vector_search_results:
        messages.append({'role': 'system', 'content': json.dumps(result['document'])})

    response = openai_client.chat.completions.create(
        model=openai_completions_deployment,
        messages=messages,
        temperature=0.1
    )    
    return response.model_dump()

def chat_completion(cache_container, movies_container, user_input):
    print("starting completion")
    # Generate embeddings from the user input
    user_embeddings = generate_embeddings(user_input)
    # Query the chat history cache first to see if this question has been asked before
    cache_results = get_cache(container=cache_container, vectors=user_embeddings, similarity_score=0.99, num_results=1)
    if len(cache_results) > 0:
        print("Cached Result\n")
        return cache_results[0]['completion'], True
        
    else:
        # Perform vector search on the movie collection
        print("New result\n")
        search_results = vector_search(movies_container, user_embeddings)

        print("Getting Chat History\n")
        # Chat history
        chat_history = get_chat_history(cache_container, 3)
        # Generate the completion
        print("Generating completions \n")
        completions_results = generate_completion(user_input, search_results, chat_history)

        print("Caching response \n")
        # Cache the response
        cache_response(cache_container, user_input, user_embeddings, completions_results)

        print("\n")
        # Return the generated LLM completion
        return completions_results['choices'][0]['message']['content'], False

Memorizzare nella cache le risposte generate

Salvare le richieste utente e i completamenti generati nella cache per risposte future più veloci.

def cache_response(container, user_prompt, prompt_vectors, response):
    chat_document = {
        'id': str(uuid.uuid4()),
        'prompt': user_prompt,
        'completion': response['choices'][0]['message']['content'],
        'completionTokens': str(response['usage']['completion_tokens']),
        'promptTokens': str(response['usage']['prompt_tokens']),
        'totalTokens': str(response['usage']['total_tokens']),
        'model': response['model'],
        'vector': prompt_vectors
    }
    container.create_item(body=chat_document)

def get_cache(container, vectors, similarity_score=0.0, num_results=5):
    # Execute the query
    results = container.query_items(
        query= '''
        SELECT TOP @num_results *
        FROM c
        WHERE VectorDistance(c.vector,@embedding) > @similarity_score
        ORDER BY VectorDistance(c.vector,@embedding)
        ''',
        parameters=[
            {"name": "@embedding", "value": vectors},
            {"name": "@num_results", "value": num_results},
            {"name": "@similarity_score", "value": similarity_score},
        ],
        enable_cross_partition_query=True, populate_query_metrics=True)
    results = list(results)
    return results

Creare un'interfaccia utente

Creare un'interfaccia utente per interagire con l'applicazione di intelligenza artificiale.

chat_history = []

with gr.Blocks() as demo:
    chatbot = gr.Chatbot(label="Cosmic Movie Assistant", type="tuples")
    msg = gr.Textbox(label="Ask me about movies in the Cosmic Movie Database!")
    clear = gr.Button("Clear")

    def user(user_message, chat_history):
        start_time = time.time()
        response_payload, cached = chat_completion(cache_container, movies_container, user_message)
        end_time = time.time()
        elapsed_time = round((end_time - start_time) * 1000, 2)
        details = f"\n (Time: {elapsed_time}ms)"
        if cached:
            details += " (Cached)"
        chat_history.append([user_message, response_payload + details])
        
        return gr.update(value=""), chat_history
    
    msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False)
    clear.click(lambda: None, None, chatbot, queue=False)

# Launch the Gradio interface
demo.launch(debug=True, share=False)

# Be sure to run this cell to close or restart the Gradio demo
demo.close()

Eseguire l'applicazione

Eseguire l'applicazione per avviare l'interfaccia Web e interagire con il set di dati.

  1. Eseguire il progetto eseguendo il comando seguente nel terminale dalla directory del progetto.

    python app.py
    
  2. Osservare l'output della console.

    Container with id 'vectorstorecontainer' created
    Container with id 'vectorcachecontainer' created
    Starting doc load, please wait...
    Sent 100 documents for insertion into collection.
    Sent 200 documents for insertion into collection.
    Sent 300 documents for insertion into collection.
    Sent 400 documents for insertion into collection.
    Sent 500 documents for insertion into collection.
    ...
    Sent 4300 documents for insertion into collection.
    Sent 4400 documents for insertion into collection.
    All 4489 documents inserted!
    Time taken: 24.66 seconds (24.659 milliseconds)
    ...
    * Running on local URL:  http://127.0.0.1:7860
    ...
    
  3. Passare all'interfaccia Web usando il browser.

  4. Eseguire qualsiasi richiesta di test usando l'interfaccia Web.

    Screenshot dell'interfaccia Web di Cosmic Movie Assistant con la conversazione di chat e il campo di input di testo.

  5. Osservare l'output aggiuntivo della console.

    ...
    New result
    
    Getting Chat History
    
    Generating completions 
    
    Caching response
    ...
    
  6. Chiudere l'interfaccia Web e l'applicazione.