Aracılığıyla paylaş


Microsoft Fabric'te Python not defterinde Cosmos DB ile çalışma

Microsoft Fabric'te Cosmos DB'den yazma ve sorgulama verilerini okumak için Microsoft Fabric'teki bir Python not defterinde Cosmos DB Python SDK'sını kullanabilirsiniz. Ayrıca Cosmos DB kapsayıcıları oluşturabilir ve yönetebilirsiniz.

Spark bağlayıcısını kullanmak, işlemleri gerçekleştirmek için doğrudan Cosmos DB uç noktasına bağlandığından, OneLake'de depolanan Doku yansıtılmış verilerinde Cosmos DB'den verileri okumak için Spark kullanmaktan farklıdır.

Önkoşullar

Uyarı

Bu makalede SampleData kapsayıcı adıyla oluşturulan yerleşik Cosmos DB örneği kullanılmaktadır.

Cosmos DB uç noktasını alma

İlk olarak, Fabric içerisindeki Cosmos DB veritabanının uç noktasını alın. Cosmos DB Spark Bağlayıcısı'nı kullanarak bağlanmak için bu uç nokta gereklidir.

  1. Fabric portalını (https://app.fabric.microsoft.com) açın.

  2. Mevcut Cosmos DB veritabanınıza gidin.

  3. Veritabanının menü çubuğunda Ayarlar seçeneğini belirleyin.

    Doku portalındaki bir veritabanı için 'Ayarlar' menü çubuğu seçeneğinin ekran görüntüsü.

  4. Ayarlar iletişim kutusunda Bağlantı bölümüne gidin. Ardından Cosmos DB NoSQL için Uç Nokta veritabanı alanının değerini kopyalayın. Bu değeri sonraki adımlarda kullanırsınız.

    Fabric portalındaki bir veritabanı için 'Ayarlar' iletişim kutusunun 'Bağlantı' bölümünün ekran görüntüsü.

Cosmos DB Python SDK paketini yükleme

  • Not defterinize azure-cosmos paketini yükleyin. Bu sürüm 4.14.0 veya üzeri olmalıdır.

    [1] hücresi:

    #Install packages
    %pip install azure-cosmos
    

Kitaplıkları içeri aktarma ve yapılandırma değerlerini ayarlama

  • Paketleri not defterinize aktarın. Bu ve diğer örneklerde Cosmos DB için zaman uyumsuz kitaplığı kullanırız. Ardından önceki adımda kaydettiğiniz Cosmos DB uç noktasını, veritabanı adını ve kapsayıcı adını uygulayın.

    [2] hücresi:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Kimlik doğrulamak için özel bir belirteç kimlik bilgisi oluşturun

  1. Kullanıcının kimliğini doğrulamak için gereken Fabric NotebookUtils kimlik bilgisi yardımcı programları tarafından oluşturulan belirteç dizesinden Cosmos DB SDK'sı için geçerli bir kimlik bilgisi nesnesi oluşturmak için bir FabricTokenCredential() nesnesi oluşturun.

    [DİKKAT!] Microsoft Fabric not defterleri Azure Kimlik Bilgileri nesnelerini yerel olarak desteklemez. Microsoft Fabric'te Cosmos DB'de kimlik doğrulaması yapmak için kullanamazsınız DefaultAzureCredential() .

    [3] hücresi:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Asenkron bir Cosmos DB istemci nesnesi ve analiz not defterinde kullanmak üzere Cosmos DB kapsayıcısına referans oluşturun.

    [4] hücresi:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Cosmos DB kapsayıcısını sorgulamak için eşzamansız bir işlev oluşturun

    Hücre [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Sorgu sonuçlarını döndürmek için yeni tanımlanan asenkron işlevi çağırın

    [6] hücresi:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output