إشعار
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تسجيل الدخول أو تغيير الدلائل.
يتطلب الوصول إلى هذه الصفحة تخويلاً. يمكنك محاولة تغيير الدلائل.
استخدم البحث المتجه في Azure DocumentDB مع مكتبة عملاء بايثون. تخزين البيانات المتجهة والاستعلام عنها بكفاءة.
يستخدم هذا البدء السريع مجموعة بيانات فندق نموذجية في ملف JSON مع متجهات محسوبة مسبقا من النموذج text-embedding-3-small . تتضمن مجموعة البيانات أسماء الفنادق والمواقع والأوصاف وتضمينات المتجهات.
ابحث عن نموذج التعليمات البرمجية على GitHub.
Prerequisites
اشتراك Azure
- إذا لم يكن لديك اشتراك Azure، فقم بإنشاء حساب مجاني
مجموعة Azure DocumentDB موجودة
-
تم تكوين النطاق المخصص
text-embedding-3-smallتم نشر النموذج
استخدم بيئة Bash في Azure Cloud Shell. لمزيد من المعلومات، راجع بدء استخدام Azure Cloud Shell.
إذا كنت تفضل تشغيل أوامر مرجع CLI محلياً قم بتثبيت CLI Azure. إذا كنت تعمل على نظام تشغيل Windows أو macOS، ففكر في تشغيل Azure CLI في حاوية Docker. لمزيد من المعلومات، راجع كيفية تشغيل Azure CLI في حاوية Docker.
إذا كنت تستخدم تثبيت محلي، يُرجى تسجيل الدخول إلى Azure CLI مستخدمًا أمر az login. لإنهاء عملية المصادقة، اتبع الخطوات المعروضة في جهازك. للحصول على خيارات تسجيل الدخول الأخرى، راجع المصادقة على Azure باستخدام Azure CLI.
عندما يُطلب منك، قم بتثبيت ملحق Azure CLI عند الاستخدام لأول مرة. لمزيد من المعلومات حول الملحقات، راجع استخدام الملحقات وإدارتها باستخدام Azure CLI.
يُرجى تشغيل إصدار az للوصول إلى الإصدار والمكتبات التابعة التي تم تثبيتها. للتحديث لآخر إصدار، يُرجى تشغيل تحديث az.
- Python 3.9 أو أحدث
إنشاء ملف بيانات باستخدام المتجهات
أنشئ مجلد بيانات جديد لملف بيانات الفندق:
mkdir dataانسخ
Hotels_Vector.jsonملف البيانات الخام باستخدام المتجهات إلى الدليل الخاص بكdata.
إنشاء مشروع Python
قم بإنشاء دليل جديد لمشروعك وافتحه في Visual Studio Code:
mkdir vector-search-quickstart code vector-search-quickstartفي المحطة الطرفية، قم بإنشاء بيئة ظاهرية وتنشيطها:
لنظام التشغيل Windows:
python -m venv venv venv\\Scripts\\activateلنظام التشغيل macOS / Linux:
python -m venv venv source venv/bin/activateقم بتثبيت الحزم المطلوبة:
pip install pymongo azure-identity openai python-dotenv-
pymongoبرنامج تشغيل MongoDB ل Python: -
azure-identityمكتبة هوية Azure للمصادقة بدون كلمة مرور: -
openaiمكتبة عميل OpenAI لإنشاء متجهات: -
python-dotenvإدارة متغير البيئة من ملفات .env:
-
أنشئ
.envملفا لمتغيرات البيئة فيvector-search-quickstart:# Azure OpenAI configuration AZURE_OPENAI_EMBEDDING_ENDPOINT= AZURE_OPENAI_EMBEDDING_MODEL=text-embedding-3-small AZURE_OPENAI_EMBEDDING_API_VERSION=2023-05-15 # Azure DocumentDB configuration MONGO_CLUSTER_NAME= # Data Configuration (defaults should work) DATA_FILE_WITH_VECTORS=data/Hotels_Vector.json EMBEDDED_FIELD=DescriptionVector EMBEDDING_DIMENSIONS=1536 EMBEDDING_SIZE_BATCH=16 LOAD_SIZE_BATCH=50بالنسبة للمصادقة بدون كلمة مرور المستخدمة في هذه المقالة، استبدل قيم العنصر النائب في الملف
.envبمعلوماتك الخاصة:-
AZURE_OPENAI_EMBEDDING_ENDPOINTعنوان URL لنقطة نهاية مورد Azure OpenAI: -
MONGO_CLUSTER_NAME: اسم مورد Azure DocumentDB الخاص بك
يجب أن تفضل دائما المصادقة بدون كلمة مرور، ولكنها ستتطلب إعدادا إضافيا. لمزيد من المعلومات حول إعداد الهوية المدارة والنطاق الكامل لخيارات المصادقة، راجع مصادقة تطبيقات Python على خدمات Azure باستخدام Azure SDK ل Python.
-
إنشاء ملفات التعليمات البرمجية للبحث المتجه
تابع المشروع عن طريق إنشاء ملفات تعليمات برمجية للبحث المتجه. عند الانتهاء ، يجب أن يبدو هيكل المشروع كما يلي:
├── data/
│ ├── Hotels.json # Source hotel data (without vectors)
│ └── Hotels_Vector.json # Hotel data with vector embeddings
└── vector-search-quickstart/
├── src/
│ ├── diskann.py # DiskANN vector search implementation
│ ├── hnsw.py # HNSW vector search implementation
│ ├── ivf.py # IVF vector search implementation
│ └── utils.py # Shared utility functions
├── requirements.txt # Python dependencies
├── .env # Environment variables template
قم بإنشاء src دليل لملفات Python الخاصة بك. أضف ملفين: diskann.py ولتنفيذ utils.py فهرس DiskANN:
mkdir src
touch src/diskann.py
touch src/utils.py
إنشاء رمز للبحث المتجه
الصق الكود التالي في الملف diskann.py .
import os
from typing import List, Dict, Any
from utils import get_clients, get_clients_passwordless, read_file_return_json, insert_data, print_search_results, drop_vector_indexes
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
def create_diskann_vector_index(collection, vector_field: str, dimensions: int) -> None:
print(f"Creating DiskANN vector index on field '{vector_field}'...")
# Drop any existing vector indexes on this field first
drop_vector_indexes(collection, vector_field)
# Use the native MongoDB command for DocumentDB vector indexes
index_command = {
"createIndexes": collection.name,
"indexes": [
{
"name": f"diskann_index_{vector_field}",
"key": {
vector_field: "cosmosSearch" # DocumentDB vector search index type
},
"cosmosSearchOptions": {
# DiskANN algorithm configuration
"kind": "vector-diskann",
# Vector dimensions must match the embedding model
"dimensions": dimensions,
# Vector similarity metric - cosine is good for text embeddings
"similarity": "COS",
# Maximum degree: number of edges per node in the graph
# Higher values improve accuracy but increase memory usage
"maxDegree": 20,
# Build parameter: candidates evaluated during index construction
# Higher values improve index quality but increase build time
"lBuild": 10
}
}
]
}
try:
# Execute the createIndexes command directly
result = collection.database.command(index_command)
print("DiskANN vector index created successfully")
except Exception as e:
print(f"Error creating DiskANN vector index: {e}")
# Check if it's a tier limitation and suggest alternatives
if "not enabled for this cluster tier" in str(e):
print("\nDiskANN indexes require a higher cluster tier.")
print("Try one of these alternatives:")
print(" • Upgrade your DocumentDB cluster to a higher tier")
print(" • Use HNSW instead: python src/hnsw.py")
print(" • Use IVF instead: python src/ivf.py")
raise
def perform_diskann_vector_search(collection,
azure_openai_client,
query_text: str,
vector_field: str,
model_name: str,
top_k: int = 5) -> List[Dict[str, Any]]:
print(f"Performing DiskANN vector search for: '{query_text}'")
try:
# Generate embedding for the query text
embedding_response = azure_openai_client.embeddings.create(
input=[query_text],
model=model_name
)
query_embedding = embedding_response.data[0].embedding
# Construct the aggregation pipeline for vector search
# DocumentDB uses $search with cosmosSearch
pipeline = [
{
"$search": {
# Use cosmosSearch for vector operations in DocumentDB
"cosmosSearch": {
# The query vector to search for
"vector": query_embedding,
# Field containing the document vectors to compare against
"path": vector_field,
# Number of final results to return
"k": top_k
}
}
},
{
# Add similarity score to the results
"$project": {
"document": "$$ROOT",
# Add search score from metadata
"score": {"$meta": "searchScore"}
}
}
]
# Execute the aggregation pipeline
results = list(collection.aggregate(pipeline))
return results
except Exception as e:
print(f"Error performing DiskANN vector search: {e}")
raise
def main():
# Load configuration from environment variables
config = {
'cluster_name': os.getenv('MONGO_CLUSTER_NAME'),
'database_name': 'Hotels',
'collection_name': 'hotels_diskann',
'data_file': os.getenv('DATA_FILE_WITH_VECTORS', '../data/Hotels_Vector.json'),
'vector_field': os.getenv('EMBEDDED_FIELD', 'DescriptionVector'),
'model_name': os.getenv('AZURE_OPENAI_EMBEDDING_MODEL', 'text-embedding-3-small'),
'dimensions': int(os.getenv('EMBEDDING_DIMENSIONS', '1536')),
'batch_size': int(os.getenv('LOAD_SIZE_BATCH', '100'))
}
try:
# Initialize clients
print("\nInitializing MongoDB and Azure OpenAI clients...")
mongo_client, azure_openai_client = get_clients_passwordless()
# Get database and collection
database = mongo_client[config['database_name']]
collection = database[config['collection_name']]
# Load data with embeddings
print(f"\nLoading data from {config['data_file']}...")
data = read_file_return_json(config['data_file'])
print(f"Loaded {len(data)} documents")
# Verify embeddings are present
documents_with_embeddings = [doc for doc in data if config['vector_field'] in doc]
if not documents_with_embeddings:
raise ValueError(f"No documents found with embeddings in field '{config['vector_field']}'. "
"Please run create_embeddings.py first.")
# Insert data into collection
print(f"\nInserting data into collection '{config['collection_name']}'...")
# Insert the hotel data
stats = insert_data(
collection,
documents_with_embeddings,
batch_size=config['batch_size']
)
if stats['inserted'] == 0 and not stats.get('skipped'):
raise ValueError("No documents were inserted successfully")
# Create DiskANN vector index (skip if data was already present)
if not stats.get('skipped'):
create_diskann_vector_index(
collection,
config['vector_field'],
config['dimensions']
)
# Wait briefly for index to be ready
import time
print("Waiting for index to be ready...")
time.sleep(2)
# Perform sample vector search
query = "quintessential lodging near running trails, eateries, retail"
results = perform_diskann_vector_search(
collection,
azure_openai_client,
query,
config['vector_field'],
config['model_name'],
top_k=5
)
# Display results
print_search_results(results, max_results=5, show_score=True)
except Exception as e:
print(f"\nError during DiskANN demonstration: {e}")
raise
finally:
# Close the MongoDB client
if 'mongo_client' in locals():
mongo_client.close()
if __name__ == "__main__":
main()
توفر هذه الوحدة الرئيسية هذه الميزات:
يتضمن وظائف الأداة المساعدة
إنشاء كائن تكوين لمتغيرات البيئة
ينشئ عملاء ل Azure OpenAI و Azure DocumentDB
يتصل ب MongoDB ، وينشئ قاعدة بيانات ومجموعة ، ويدرج البيانات ، وينشئ فهارس قياسية
ينشئ فهرس متجه باستخدام IVF أو HNSW أو DiskANN
ينشئ تضمينا لعينة نص استعلام باستخدام عميل OpenAI. يمكنك تغيير الاستعلام في أعلى الملف
تشغيل بحث متجه باستخدام التضمين وطباعة النتائج
إنشاء وظائف الأداة المساعدة
الصق الكود التالي في utils.py:
import json
import os
import time
import warnings
from typing import Dict, List, Any, Optional, Tuple
# Suppress the PyMongo CosmosDB cluster detection warning
# Must be set before importing pymongo
warnings.filterwarnings(
"ignore",
message="You appear to be connected to a CosmosDB cluster.*",
)
from pymongo import MongoClient, InsertOne
from pymongo.collection import Collection
from pymongo.errors import BulkWriteError
from azure.identity import DefaultAzureCredential
from pymongo.auth_oidc import OIDCCallback, OIDCCallbackContext, OIDCCallbackResult
from openai import AzureOpenAI
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
class AzureIdentityTokenCallback(OIDCCallback):
def __init__(self, credential):
self.credential = credential
def fetch(self, context: OIDCCallbackContext) -> OIDCCallbackResult:
token = self.credential.get_token(
"https://ossrdbms-aad.database.windows.net/.default").token
return OIDCCallbackResult(access_token=token)
def get_clients() -> Tuple[MongoClient, AzureOpenAI]:
# Get MongoDB connection string - required for DocumentDB access
mongo_connection_string = os.getenv("MONGO_CONNECTION_STRING")
if not mongo_connection_string:
raise ValueError("MONGO_CONNECTION_STRING environment variable is required")
# Create MongoDB client with optimized settings for DocumentDB
mongo_client = MongoClient(
mongo_connection_string,
maxPoolSize=50, # Allow up to 50 connections for better performance
minPoolSize=5, # Keep minimum 5 connections open
maxIdleTimeMS=30000, # Close idle connections after 30 seconds
serverSelectionTimeoutMS=5000, # 5 second timeout for server selection
socketTimeoutMS=20000 # 20 second socket timeout
)
# Get Azure OpenAI configuration
azure_openai_endpoint = os.getenv("AZURE_OPENAI_EMBEDDING_ENDPOINT")
azure_openai_key = os.getenv("AZURE_OPENAI_EMBEDDING_KEY")
if not azure_openai_endpoint or not azure_openai_key:
raise ValueError("Azure OpenAI endpoint and key are required")
# Create Azure OpenAI client for generating embeddings
azure_openai_client = AzureOpenAI(
azure_endpoint=azure_openai_endpoint,
api_key=azure_openai_key,
api_version=os.getenv("AZURE_OPENAI_EMBEDDING_API_VERSION", "2023-05-15")
)
return mongo_client, azure_openai_client
def get_clients_passwordless() -> Tuple[MongoClient, AzureOpenAI]:
# Get MongoDB cluster name for passwordless authentication
cluster_name = os.getenv("MONGO_CLUSTER_NAME")
if not cluster_name:
raise ValueError("MONGO_CLUSTER_NAME environment variable is required")
# Create credential object for Azure authentication
credential = DefaultAzureCredential()
authProperties = {"OIDC_CALLBACK": AzureIdentityTokenCallback(credential)}
# Create MongoDB client with Azure AD token callback
mongo_client = MongoClient(
f"mongodb+srv://{cluster_name}.global.mongocluster.cosmos.azure.com/",
connectTimeoutMS=120000,
tls=True,
retryWrites=True,
authMechanism="MONGODB-OIDC",
authMechanismProperties=authProperties
)
# Get Azure OpenAI endpoint
azure_openai_endpoint = os.getenv("AZURE_OPENAI_EMBEDDING_ENDPOINT")
if not azure_openai_endpoint:
raise ValueError("AZURE_OPENAI_EMBEDDING_ENDPOINT environment variable is required")
# Create Azure OpenAI client with credential-based authentication
azure_openai_client = AzureOpenAI(
azure_endpoint=azure_openai_endpoint,
azure_ad_token_provider=lambda: credential.get_token("https://cognitiveservices.azure.com/.default").token,
api_version=os.getenv("AZURE_OPENAI_EMBEDDING_API_VERSION", "2023-05-15")
)
return mongo_client, azure_openai_client
def azure_identity_token_callback(credential: DefaultAzureCredential) -> str:
# DocumentDB requires this specific scope
token_scope = "https://cosmos.azure.com/.default"
# Get token from Azure AD
token = credential.get_token(token_scope)
return token.token
def read_file_return_json(file_path: str) -> List[Dict[str, Any]]:
try:
with open(file_path, 'r', encoding='utf-8') as file:
return json.load(file)
except FileNotFoundError:
print(f"Error: File '{file_path}' not found")
raise
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in file '{file_path}': {e}")
raise
def write_file_json(data: List[Dict[str, Any]], file_path: str) -> None:
try:
with open(file_path, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=2, ensure_ascii=False)
print(f"Data successfully written to '{file_path}'")
except IOError as e:
print(f"Error writing to file '{file_path}': {e}")
raise
def insert_data(collection: Collection, data: List[Dict[str, Any]],
batch_size: int = 100, index_fields: Optional[List[str]] = None) -> Dict[str, int]:
total_documents = len(data)
# Check if data already exists in the collection
existing_count = collection.count_documents({})
if existing_count >= total_documents:
print(f"Collection already has {existing_count} documents, skipping insert and index creation")
return {'total': total_documents, 'inserted': 0, 'failed': 0, 'skipped': True}
# Clear existing data if counts don't match to ensure clean state
if existing_count > 0:
print(f"Collection has {existing_count} documents but expected {total_documents}, clearing and re-inserting...")
collection.delete_many({})
inserted_count = 0
failed_count = 0
print(f"Starting batch insertion of {total_documents} documents...")
# Create indexes if specified
if index_fields:
for field in index_fields:
try:
collection.create_index(field)
print(f"Created index on field: {field}")
except Exception as e:
print(f"Warning: Could not create index on {field}: {e}")
# Process data in batches to manage memory and error recovery
for i in range(0, total_documents, batch_size):
batch = data[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (total_documents + batch_size - 1) // batch_size
try:
# Prepare bulk insert operations
operations = [InsertOne(document) for document in batch]
# Execute bulk insert
result = collection.bulk_write(operations, ordered=False)
inserted_count += result.inserted_count
print(f"Batch {batch_num} completed: {result.inserted_count} documents inserted")
except BulkWriteError as e:
# Handle partial failures in bulk operations
inserted_count += e.details.get('nInserted', 0)
failed_count += len(batch) - e.details.get('nInserted', 0)
print(f"Batch {batch_num} had errors: {e.details.get('nInserted', 0)} inserted, "
f"{failed_count} failed")
# Print specific error details for debugging
for error in e.details.get('writeErrors', []):
print(f" Error: {error.get('errmsg', 'Unknown error')}")
except Exception as e:
# Handle unexpected errors
failed_count += len(batch)
print(f"Batch {batch_num} failed completely: {e}")
# Small delay between batches to avoid overwhelming the database
time.sleep(0.1)
# Return summary statistics
stats = {
'total': total_documents,
'inserted': inserted_count,
'failed': failed_count
}
return stats
def drop_vector_indexes(collection, vector_field: str) -> None:
try:
# Get all indexes for the collection
indexes = list(collection.list_indexes())
# Find vector indexes on the specified field
vector_indexes = []
for index in indexes:
if 'key' in index and vector_field in index['key']:
if index['key'][vector_field] == 'cosmosSearch':
vector_indexes.append(index['name'])
# Drop each vector index found
for index_name in vector_indexes:
print(f"Dropping existing vector index: {index_name}")
collection.drop_index(index_name)
if vector_indexes:
print(f"Dropped {len(vector_indexes)} existing vector index(es)")
else:
print("No existing vector indexes found to drop")
except Exception as e:
print(f"Warning: Could not drop existing vector indexes: {e}")
# Continue anyway - the error might be that no indexes exist
def print_search_resultsx(results: List[Dict[str, Any]],
max_results: int = 5,
show_score: bool = True) -> None:
if not results:
print("No search results found.")
return
print(f"\nSearch Results (showing top {min(len(results), max_results)}):")
print("=" * 80)
for i, result in enumerate(results[:max_results], 1):
# Display hotel name and ID
print(f"HotelName: {result['HotelName']}, Score: {result['score']:.4f}")
def print_search_results(results: List[Dict[str, Any]],
max_results: int = 5,
show_score: bool = True) -> None:
if not results:
print("No search results found.")
return
print(f"\nSearch Results (showing top {min(len(results), max_results)}):")
print("=" * 80)
for i, result in enumerate(results[:max_results], 1):
# Check if results are nested under 'document' (when using $$ROOT)
if 'document' in result:
doc = result['document']
else:
doc = result
# Display hotel name and ID
print(f"HotelName: {doc['HotelName']}, Score: {result['score']:.4f}")
if len(results) > max_results:
print(f"\n... and {len(results) - max_results} more results")
توفر وحدة الأداة المساعدة هذه الميزات:
-
get_clients: ينشئ ويعيد العملاء ل Azure OpenAI و Azure DocumentDB -
get_clients_passwordless: ينشئ ويعيد العملاء ل Azure OpenAI و Azure DocumentDB باستخدام المصادقة بدون كلمة مرور: -
azure_identity_token_callback: يحصل على رمز Azure AD المستخدم في مصادقة OIDC في MongoDB -
read_file_return_jsonيقرأ ملف JSON ويعيد محتوياته كمصفوفة من الكائنات: -
write_file_jsonيكتب مصفوفة من الكائنات إلى ملف JSON: -
insert_dataإدراج البيانات على دفعات في مجموعة MongoDB وإنشاء فهارس قياسية في حقول محددة: -
drop_vector_indexes: يسقط مؤشرات المتجهات الموجودة على حقل المتجه المستهدف: -
print_search_results: يطبع نتائج البحث المتجهة، بما في ذلك النتيجة واسم الفندق
المصادقة باستخدام Azure CLI
سجل الدخول إلى Azure CLI قبل تشغيل التطبيق حتى يتمكن من الوصول إلى موارد Azure بأمان.
az login
شغّل التطبيق
لتشغيل البرامج النصية Python:
ترى أفضل خمسة فنادق تطابق استعلام البحث المتجه ودرجات التشابه بينهما.
عرض البيانات وإدارتها في Visual Studio Code
اختر إضافة DocumentDB في كود Visual Studio للاتصال بحساب Azure DocumentDB الخاص بك.
عرض البيانات والفهارس في قاعدة بيانات الفنادق.
تنظيف الموارد
احذف مجموعة الموارد، وحساب Azure DocumentDB، ومورد Azure OpenAI عندما لا تحتاجها لتجنب التكاليف الإضافية.