# Unzip the data filewith zipfile.ZipFile("../../DataSet/Movies/MovieLens-4489-256D.zip", 'r') as zip_ref:
zip_ref.extractall("/Data")
zip_ref.close()
# Load the data file
data = []
with open('/Data/MovieLens-4489-256D.json', 'r') as d:
data = json.load(d)
# View the number of documents in the data (4489)
len(data)
6.將資料儲存在 Azure Cosmos DB 中
將資料更新並插入 Azure Cosmos DB for NoSQL 記錄會以非同步方式寫入。
Python
#The following code to get raw movies data is commented out in favour of#getting prevectorized data. If you want to vectorize the raw data from#storage_file_url, uncomment the below, and set vectorizeFlag=True#data = urllib.request.urlopen(storage_file_url)#data = json.load(data)
vectorizeFlag=Falseimport asyncio
import time
from concurrent.futures import ThreadPoolExecutor
asyncdefgenerate_vectors(items, vector_property):# Create a thread pool executor for the synchronous generate_embeddings
loop = asyncio.get_event_loop()
# Define a function to call generate_embeddings using run_in_executorasyncdefgenerate_embedding_for_item(item):try:
# Offload the sync generate_embeddings to a thread
vectorArray = await loop.run_in_executor(None, generate_embeddings, item['overview'])
item[vector_property] = vectorArray
except Exception as e:
# Log or handle exceptions if needed
logging.error(f"Error generating embedding for item: {item['overview'][:50]}...", exc_info=True)
# Create tasks for all the items to generate embeddings concurrently
tasks = [generate_embedding_for_item(item) for item in items]
# Run all the tasks concurrently and wait for their completionawait asyncio.gather(*tasks)
return items
asyncdefinsert_data(vectorize=False):
start_time = time.time() # Record the start time# If vectorize flag is True, generate vectors for the dataif vectorize:
print("Vectorizing data, please wait...")
global data
data = await generate_vectors(data, "vector")
counter = 0
tasks = []
max_concurrency = 5# Adjust this value to control the level of concurrency
semaphore = asyncio.Semaphore(max_concurrency)
print("Starting doc load, please wait...")
defupsert_item_sync(obj):
movies_container.upsert_item(body=obj)
asyncdefupsert_object(obj):nonlocal counter
asyncwith semaphore:
await asyncio.get_event_loop().run_in_executor(None, upsert_item_sync, obj)
# Progress reporting
counter += 1if counter % 100 == 0:
print(f"Sent {counter} documents for insertion into collection.")
for obj in data:
tasks.append(asyncio.create_task(upsert_object(obj)))
# Run all upsert tasks concurrently within the limits set by the semaphoreawait asyncio.gather(*tasks)
end_time = time.time() # Record the end time
duration = end_time - start_time # Calculate the duration
print(f"All {counter} documents inserted!")
print(f"Time taken: {duration:.2f} seconds ({duration:.3f} milliseconds)")
# Run the async function with the vectorize flag set to True or False as neededawait insert_data(vectorizeFlag) # or await insert_data() for default
7.執行向量搜尋
此函式會定義電影資料和聊天快取集合的向量搜尋。
Python
defvector_search(container, vectors, similarity_score=0.02, num_results=5):
results = container.query_items(
query='''
SELECT TOP @num_results c.overview, VectorDistance(c.vector, @embedding) as SimilarityScore
FROM c
WHERE VectorDistance(c.vector,@embedding) > @similarity_score
ORDER BY VectorDistance(c.vector,@embedding)
''',
parameters=[
{"name": "@embedding", "value": vectors},
{"name": "@num_results", "value": num_results},
{"name": "@similarity_score", "value": similarity_score}
],
enable_cross_partition_query=True,
populate_query_metrics=True
)
results = list(results)
formatted_results = [{'SimilarityScore': result.pop('SimilarityScore'), 'document': result} for result in results]
return formatted_results
8.取得最近的聊天記錄
此函式會提供 LLM 的交談內容,讓其能夠更好地與使用者進行交談。
Python
defget_chat_history(container, completions=3):
results = container.query_items(
query='''
SELECT TOP @completions *
FROM c
ORDER BY c._ts DESC
''',
parameters=[
{"name": "@completions", "value": completions},
],
enable_cross_partition_query=True
)
results = list(results)
return results
9.聊天完成函式
定義函式來處理聊天完成程序,包括快取回應。
Python
defgenerate_completion(user_prompt, vector_search_results, chat_history):
system_prompt = '''
You are an intelligent assistant for movies. You are designed to provide helpful answers to user questions about movies in your database.
You are friendly, helpful, and informative and can be lighthearted. Be concise in your responses, but still friendly.
- Only answer questions related to the information provided below. Provide at least 3 candidate movie answers in a list.
- Write two lines of whitespace between each answer in the list.
'''
messages = [{'role': 'system', 'content': system_prompt}]
for chat in chat_history:
messages.append({'role': 'user', 'content': chat['prompt'] + " " + chat['completion']})
messages.append({'role': 'user', 'content': user_prompt})
for result in vector_search_results:
messages.append({'role': 'system', 'content': json.dumps(result['document'])})
response = openai_client.chat.completions.create(
model=openai_completions_deployment,
messages=messages,
temperature=0.1
)
return response.model_dump()
defchat_completion(cache_container, movies_container, user_input):
print("starting completion")
# Generate embeddings from the user input
user_embeddings = generate_embeddings(user_input)
# Query the chat history cache first to see if this question has been asked before
cache_results = get_cache(container=cache_container, vectors=user_embeddings, similarity_score=0.99, num_results=1)
if len(cache_results) > 0:
print("Cached Result\n")
return cache_results[0]['completion'], Trueelse:
# Perform vector search on the movie collection
print("New result\n")
search_results = vector_search(movies_container, user_embeddings)
print("Getting Chat History\n")
# Chat history
chat_history = get_chat_history(cache_container, 3)
# Generate the completion
print("Generating completions \n")
completions_results = generate_completion(user_input, search_results, chat_history)
print("Caching response \n")
# Cache the response
cache_response(cache_container, user_input, user_embeddings, completions_results)
print("\n")
# Return the generated LLM completionreturn completions_results['choices'][0]['message']['content'], False