Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In dieser Anleitung wird veranschaulicht, wie Sie eine Anwendung mit RAG-Muster mit einer Teilmenge des MovieLens-Datasets erstellen. In diesem Beispiel wird das Python SDK für Azure Cosmos DB for NoSQL verwendet, um die Vektorsuche für RAG durchzuführen, Chatverläufe zu speichern und abzurufen sowie Vektoren des Chatverlaufs zu speichern, die als semantischer Cache verwendet werden. Azure OpenAI wird verwendet, um Einbettungen und LLM-Fertigstellungen (großes Sprachmodell) zu generieren.
Am Ende erstellen Sie eine einfache Benutzererfahrung (UX) mit Gradio, damit Benutzer Fragen eingeben und Antworten anzeigen können, die von Azure OpenAI generiert oder aus dem Cache bereitgestellt wurden. Die Antworten zeigen außerdem die verstrichene Zeit an, sodass die Auswirkungen von Zwischenspeichern auf die Leistung im Vergleich zum Generieren einer Antwort deutlich werden.
Tipp
Sie finden das vollständige Python-Notebookbeispiel hier.
Weitere RAG-Beispiele finden Sie unter AzureDataRetrievalAugmentedGenerationSamples.
Wichtig
Für dieses Beispiel müssen Sie Konten für Azure Cosmos DB for NoSQL und Azure OpenAI einrichten. Um zu beginnen, besuchen Sie folgende Seiten:
1. Installieren der erforderlichen Pakete
Installieren Sie die erforderlichen Python-Pakete für das Interagieren mit Azure Cosmos DB und anderen Diensten.
! pip install --user python-dotenv
! pip install --user aiohttp
! pip install --user openai
! pip install --user gradio
! pip install --user ijson
! pip install --user nest_asyncio
! pip install --user tenacity
# Note: ensure you have azure-cosmos version 4.7 or higher installed
! pip install --user azure-cosmos
2. Initialisieren der Clientverbindung
Füllen Sie sample_env_file.env
mit den entsprechenden Anmeldeinformationen für Azure Cosmos DB und Azure OpenAI auf.
cosmos_uri = "https://<replace with cosmos db account name>.documents.azure.com:443/"
cosmos_key = "<replace with cosmos db account key>"
cosmos_database_name = "database"
cosmos_collection_name = "vectorstore"
cosmos_vector_property_name = "vector"
cosmos_cache_database_name = "database"
cosmos_cache_collection_name = "vectorcache"
openai_endpoint = "<replace with azure openai endpoint>"
openai_key = "<replace with azure openai key>"
openai_type = "azure"
openai_api_version = "2023-05-15"
openai_embeddings_deployment = "<replace with azure openai embeddings deployment name>"
openai_embeddings_model = "<replace with azure openai embeddings model - e.g. text-embedding-3-large"
openai_embeddings_dimensions = "1536"
openai_completions_deployment = "<replace with azure openai completions deployment name>"
openai_completions_model = "<replace with azure openai completions model - e.g. gpt-35-turbo>"
storage_file_url = "https://cosmosdbcosmicworks.blob.core.windows.net/fabcondata/movielens_dataset.json"
# Import the required libraries
import time
import json
import uuid
import urllib
import ijson
import zipfile
from dotenv import dotenv_values
from openai import AzureOpenAI
from azure.core.exceptions import AzureError
from azure.cosmos import PartitionKey, exceptions
from time import sleep
import gradio as gr
# Cosmos DB imports
from azure.cosmos import CosmosClient
# Load configuration
env_name = "sample_env_file.env"
config = dotenv_values(env_name)
cosmos_conn = config['cosmos_uri']
cosmos_key = config['cosmos_key']
cosmos_database = config['cosmos_database_name']
cosmos_collection = config['cosmos_collection_name']
cosmos_vector_property = config['cosmos_vector_property_name']
comsos_cache_db = config['cosmos_cache_database_name']
cosmos_cache = config['cosmos_cache_collection_name']
# Create the Azure Cosmos DB for NoSQL async client for faster data loading
cosmos_client = CosmosClient(url=cosmos_conn, credential=cosmos_key)
openai_endpoint = config['openai_endpoint']
openai_key = config['openai_key']
openai_api_version = config['openai_api_version']
openai_embeddings_deployment = config['openai_embeddings_deployment']
openai_embeddings_dimensions = int(config['openai_embeddings_dimensions'])
openai_completions_deployment = config['openai_completions_deployment']
# Movies file url
storage_file_url = config['storage_file_url']
# Create the OpenAI client
openai_client = AzureOpenAI(azure_endpoint=openai_endpoint, api_key=openai_key, api_version=openai_api_version)
3. Erstellen einer Datenbank und Containern mit Vektorrichtlinien
Diese Funktion verwendet ein Datenbankobjekt, einen Sammlungsnamen, den Namen der Dokumenteigenschaft, die Vektoren speichert, sowie die Anzahl der Vektordimensionen, die für die Einbettungen verwendet werden.
db = cosmos_client.create_database_if_not_exists(cosmos_database)
# Create the vector embedding policy to specify vector details
vector_embedding_policy = {
"vectorEmbeddings": [
{
"path":"/" + cosmos_vector_property,
"dataType":"float32",
"distanceFunction":"cosine",
"dimensions":openai_embeddings_dimensions
},
]
}
# Create the vector index policy to specify vector details
indexing_policy = {
"includedPaths": [
{
"path": "/*"
}
],
"excludedPaths": [
{
"path": "/\"_etag\"/?",
"path": "/" + cosmos_vector_property + "/*",
}
],
"vectorIndexes": [
{
"path": "/"+cosmos_vector_property,
"type": "quantizedFlat"
}
]
}
# Create the data collection with vector index (note: this creates a container with 10000 RUs to allow fast data load)
try:
movies_container = db.create_container_if_not_exists(id=cosmos_collection,
partition_key=PartitionKey(path='/id'),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy,
offer_throughput=10000)
print('Container with id \'{0}\' created'.format(movies_container.id))
except exceptions.CosmosHttpResponseError:
raise
# Create the cache collection with vector index
try:
cache_container = db.create_container_if_not_exists(id=cosmos_cache,
partition_key=PartitionKey(path='/id'),
indexing_policy=indexing_policy,
vector_embedding_policy=vector_embedding_policy,
offer_throughput=1000)
print('Container with id \'{0}\' created'.format(cache_container.id))
except exceptions.CosmosHttpResponseError:
raise
4. Generieren von Einbettungen mit Azure OpenAI
Diese Funktion vektorisiert die Benutzereingabe für die Vektorsuche. Stellen Sie sicher, dass die verwendete Dimensionalität und das verwendete Modell mit den bereitgestellten Beispieldaten übereinstimmen, oder generieren Sie Vektoren mit Ihrem gewünschten Modell erneut.
from tenacity import retry, stop_after_attempt, wait_random_exponential
import logging
@retry(wait=wait_random_exponential(min=2, max=300), stop=stop_after_attempt(20))
def generate_embeddings(text):
try:
response = openai_client.embeddings.create(
input=text,
model=openai_embeddings_deployment,
dimensions=openai_embeddings_dimensions
)
embeddings = response.model_dump()
return embeddings['data'][0]['embedding']
except Exception as e:
# Log the exception with traceback for easier debugging
logging.error("An error occurred while generating embeddings.", exc_info=True)
raise
5. Laden von Daten aus der JSON-Datei
Extrahieren Sie das vorab vektorisierte MovieLens-Dataset aus der ZIP-Datei (an diesem Speicherort im Notebookrepository).
# Unzip the data file
with zipfile.ZipFile("../../DataSet/Movies/MovieLens-4489-256D.zip", 'r') as zip_ref:
zip_ref.extractall("/Data")
zip_ref.close()
# Load the data file
data = []
with open('/Data/MovieLens-4489-256D.json', 'r') as d:
data = json.load(d)
# View the number of documents in the data (4489)
len(data)
6. Überprüfen der Daten in Azure Cosmos DB
Führen Sie ein Upsert für Daten in Azure Cosmos DB for NoSQL aus. Datensätze werden asynchron geschrieben.
#The following code to get raw movies data is commented out in favour of
#getting prevectorized data. If you want to vectorize the raw data from
#storage_file_url, uncomment the below, and set vectorizeFlag=True
#data = urllib.request.urlopen(storage_file_url)
#data = json.load(data)
vectorizeFlag=False
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
async def generate_vectors(items, vector_property):
# Create a thread pool executor for the synchronous generate_embeddings
loop = asyncio.get_event_loop()
# Define a function to call generate_embeddings using run_in_executor
async def generate_embedding_for_item(item):
try:
# Offload the sync generate_embeddings to a thread
vectorArray = await loop.run_in_executor(None, generate_embeddings, item['overview'])
item[vector_property] = vectorArray
except Exception as e:
# Log or handle exceptions if needed
logging.error(f"Error generating embedding for item: {item['overview'][:50]}...", exc_info=True)
# Create tasks for all the items to generate embeddings concurrently
tasks = [generate_embedding_for_item(item) for item in items]
# Run all the tasks concurrently and wait for their completion
await asyncio.gather(*tasks)
return items
async def insert_data(vectorize=False):
start_time = time.time() # Record the start time
# If vectorize flag is True, generate vectors for the data
if vectorize:
print("Vectorizing data, please wait...")
global data
data = await generate_vectors(data, "vector")
counter = 0
tasks = []
max_concurrency = 5 # Adjust this value to control the level of concurrency
semaphore = asyncio.Semaphore(max_concurrency)
print("Starting doc load, please wait...")
def upsert_item_sync(obj):
movies_container.upsert_item(body=obj)
async def upsert_object(obj):
nonlocal counter
async with semaphore:
await asyncio.get_event_loop().run_in_executor(None, upsert_item_sync, obj)
# Progress reporting
counter += 1
if counter % 100 == 0:
print(f"Sent {counter} documents for insertion into collection.")
for obj in data:
tasks.append(asyncio.create_task(upsert_object(obj)))
# Run all upsert tasks concurrently within the limits set by the semaphore
await asyncio.gather(*tasks)
end_time = time.time() # Record the end time
duration = end_time - start_time # Calculate the duration
print(f"All {counter} documents inserted!")
print(f"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)")
# Run the async function with the vectorize flag set to True or False as needed
await insert_data(vectorizeFlag) # or await insert_data() for default
7. Durchführen einer Vektorsuche
Diese Funktion definiert eine Vektorsuche für die Filmdaten- und Chatcachesammlungen.
def vector_search(container, vectors, similarity_score=0.02, num_results=5):
results = container.query_items(
query='''
SELECT TOP @num_results c.overview, VectorDistance(c.vector, @embedding) as SimilarityScore
FROM c
WHERE VectorDistance(c.vector,@embedding) > @similarity_score
ORDER BY VectorDistance(c.vector,@embedding)
''',
parameters=[
{"name": "@embedding", "value": vectors},
{"name": "@num_results", "value": num_results},
{"name": "@similarity_score", "value": similarity_score}
],
enable_cross_partition_query=True,
populate_query_metrics=True
)
results = list(results)
formatted_results = [{'SimilarityScore': result.pop('SimilarityScore'), 'document': result} for result in results]
return formatted_results
8. Abrufen des aktuellen Chatverlaufs
Diese Funktion stellt unterhaltungsbezogenen Kontext für das LLM bereit, sodass es eine Unterhaltung mit dem Benutzer besser führen kann.
def get_chat_history(container, completions=3):
results = container.query_items(
query='''
SELECT TOP @completions *
FROM c
ORDER BY c._ts DESC
''',
parameters=[
{"name": "@completions", "value": completions},
],
enable_cross_partition_query=True
)
results = list(results)
return results
9. Chatvervollständigungsfunktionen
Definieren Sie die Funktionen zum Verarbeiten des Chatvervollständigungsprozesses einschließlich des Zwischenspeicherns von Antworten.
def generate_completion(user_prompt, vector_search_results, chat_history):
system_prompt = '''
You are an intelligent assistant for movies. You are designed to provide helpful answers to user questions about movies in your database.
You are friendly, helpful, and informative and can be lighthearted. Be concise in your responses, but still friendly.
- Only answer questions related to the information provided below. Provide at least 3 candidate movie answers in a list.
- Write two lines of whitespace between each answer in the list.
'''
messages = [{'role': 'system', 'content': system_prompt}]
for chat in chat_history:
messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
messages.append({'role': 'user', 'content': user_prompt})
for result in vector_search_results:
messages.append({'role': 'system', 'content': json.dumps(result['document'])})
response = openai_client.chat.completions.create(
model=openai_completions_deployment,
messages=messages,
temperature=0.1
)
return response.model_dump()
def chat_completion(cache_container, movies_container, user_input):
print("starting completion")
# Generate embeddings from the user input
user_embeddings = generate_embeddings(user_input)
# Query the chat history cache first to see if this question has been asked before
cache_results = get_cache(container=cache_container, vectors=user_embeddings, similarity_score=0.99, num_results=1)
if len(cache_results) > 0:
print("Cached Result\n")
return cache_results[0]['completion'], True
else:
# Perform vector search on the movie collection
print("New result\n")
search_results = vector_search(movies_container, user_embeddings)
print("Getting Chat History\n")
# Chat history
chat_history = get_chat_history(cache_container, 3)
# Generate the completion
print("Generating completions \n")
completions_results = generate_completion(user_input, search_results, chat_history)
print("Caching response \n")
# Cache the response
cache_response(cache_container, user_input, user_embeddings, completions_results)
print("\n")
# Return the generated LLM completion
return completions_results['choices'][0]['message']['content'], False
10. Generierte Antworten aus dem Cache
Speichern Sie die Benutzeraufforderungen und generierten Fertigstellungen im Cache, um zukünftig schnellere Antworten zu erhalten.
def cache_response(container, user_prompt, prompt_vectors, response):
chat_document = {
'id': str(uuid.uuid4()),
'prompt': user_prompt,
'completion': response['choices'][0]['message']['content'],
'completionTokens': str(response['usage']['completion_tokens']),
'promptTokens': str(response['usage']['prompt_tokens']),
'totalTokens': str(response['usage']['total_tokens']),
'model': response['model'],
'vector': prompt_vectors
}
container.create_item(body=chat_document)
def get_cache(container, vectors, similarity_score=0.0, num_results=5):
# Execute the query
results = container.query_items(
query= '''
SELECT TOP @num_results *
FROM c
WHERE VectorDistance(c.vector,@embedding) > @similarity_score
ORDER BY VectorDistance(c.vector,@embedding)
''',
parameters=[
{"name": "@embedding", "value": vectors},
{"name": "@num_results", "value": num_results},
{"name": "@similarity_score", "value": similarity_score},
],
enable_cross_partition_query=True, populate_query_metrics=True)
results = list(results)
return results
11. Erstellen einer einfachen UX in Gradio
Erstellen Sie für das Interagieren mit der KI-Anwendung eine Benutzeroberfläche in Gradio.
chat_history = []
with gr.Blocks() as demo:
chatbot = gr.Chatbot(label="Cosmic Movie Assistant")
msg = gr.Textbox(label="Ask me about movies in the Cosmic Movie Database!")
clear = gr.Button("Clear")
def user(user_message, chat_history):
start_time = time.time()
response_payload, cached = chat_completion(cache_container, movies_container, user_message)
end_time = time.time()
elapsed_time = round((end
time - start_time) * 1000, 2)
details = f"\n (Time: {elapsed_time}ms)"
if cached:
details += " (Cached)"
chat_history.append([user_message, response_payload + details])
return gr.update(value=""), chat_history
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False)
clear.click(lambda: None, None, chatbot, queue=False)
# Launch the Gradio interface
demo.launch(debug=True)
# Be sure to run this cell to close or restart the Gradio demo
demo.close()
Vektordatenbanklösungen
pgvector-Erweiterung für Azure PostgreSQL-Server
Zugehöriger Inhalt
- 30-tägige kostenlose Testversion ohne Azure-Abonnement
- 90-tägige kostenlose Testversion und bis zu 6.000 USD Durchsatzguthaben mit Azure KI Advantage