Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Microsoft Fabric'te Cosmos DB'den yazma ve sorgulama verilerini okumak için Microsoft Fabric'teki bir Python not defterinde Cosmos DB Python SDK'sını kullanabilirsiniz. Ayrıca Cosmos DB kapsayıcıları oluşturabilir ve yönetebilirsiniz.
Spark bağlayıcısını kullanmak, işlemleri gerçekleştirmek için doğrudan Cosmos DB uç noktasına bağlandığından, OneLake'de depolanan Doku yansıtılmış verilerinde Cosmos DB'den verileri okumak için Spark kullanmaktan farklıdır.
İpucu
GitHub'daki Microsoft Fabric Örnekleri'nde Cosmos DB'den tam örneği indirin.
Önkoşullar
Mevcut doku kapasitesi
- Eğer Fabric kapasiteniz yoksa, bir Fabric deneme sürümü başlatın.
Fabric üzerinde mevcut bir Cosmos DB veritabanı
- Henüz bir veritabanınız yoksa Fabric'de yeni bir Cosmos DB veritabanı oluşturun.
Veri içeren mevcut bir kapsayıcı
- Henüz bir kapsayıcınız yoksa örnek veri kapsayıcısını yüklemenizi öneririz.
Uyarı
Bu makalede SampleData kapsayıcı adıyla oluşturulan yerleşik Cosmos DB örneği kullanılmaktadır.
Cosmos DB uç noktasını alma
İlk olarak, Fabric içerisindeki Cosmos DB veritabanının uç noktasını alın. Cosmos DB Spark Bağlayıcısı'nı kullanarak bağlanmak için bu uç nokta gereklidir.
Fabric portalını (https://app.fabric.microsoft.com) açın.
Mevcut Cosmos DB veritabanınıza gidin.
Veritabanının menü çubuğunda Ayarlar seçeneğini belirleyin.
Ayarlar iletişim kutusunda Bağlantı bölümüne gidin. Ardından Cosmos DB NoSQL için Uç Nokta veritabanı alanının değerini kopyalayın. Bu değeri sonraki adımlarda kullanırsınız.
Cosmos DB Python SDK paketini yükleme
Not defterinize azure-cosmos paketini yükleyin. Bu sürüm 4.14.0 veya üzeri olmalıdır.
[1] hücresi:
#Install packages %pip install azure-cosmos
Kitaplıkları içeri aktarma ve yapılandırma değerlerini ayarlama
Paketleri not defterinize aktarın. Bu ve diğer örneklerde Cosmos DB için zaman uyumsuz kitaplığı kullanırız. Ardından önceki adımda kaydettiğiniz Cosmos DB uç noktasını, veritabanı adını ve kapsayıcı adını uygulayın.
[2] hücresi:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Kimlik doğrulamak için özel bir belirteç kimlik bilgisi oluşturun
Kullanıcının kimliğini doğrulamak için gereken Fabric NotebookUtils kimlik bilgisi yardımcı programları tarafından oluşturulan belirteç dizesinden Cosmos DB SDK'sı için geçerli bir kimlik bilgisi nesnesi oluşturmak için bir FabricTokenCredential() nesnesi oluşturun.
[DİKKAT!] Microsoft Fabric not defterleri Azure Kimlik Bilgileri nesnelerini yerel olarak desteklemez. Microsoft Fabric'te Cosmos DB'de kimlik doğrulaması yapmak için kullanamazsınız
DefaultAzureCredential().[3] hücresi:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Asenkron bir Cosmos DB istemci nesnesi ve analiz not defterinde kullanmak üzere Cosmos DB kapsayıcısına referans oluşturun.
[4] hücresi:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Cosmos DB kapsayıcısını sorgulamak için eşzamansız bir işlev oluşturun
Hücre [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseSorgu sonuçlarını döndürmek için yeni tanımlanan asenkron işlevi çağırın
[6] hücresi:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output