Udostępnij przez


Praca z usługą Cosmos DB w notesie języka Python w usłudze Microsoft Fabric

Zestaw SDK języka Python usługi Cosmos DB można używać w notesie języka Python w usłudze Microsoft Fabric do odczytywania, zapisywania i wykonywania zapytań dotyczących danych z usługi Cosmos DB w Microsoft Fabric. Można również tworzyć kontenery usługi Cosmos DB i zarządzać nimi.

Używanie łącznika Spark różni się od używania Spark do odczytywania danych z Cosmos DB w Fabric z dublowanymi danymi przechowywanymi w OneLake, ponieważ łączy się bezpośrednio z punktem końcowym Cosmos DB w celu wykonywania operacji.

Wskazówka

Pobierz kompletny przykład z usługi Cosmos DB w przykładach usługi Microsoft Fabric w witrynie GitHub.

Wymagania wstępne

Uwaga / Notatka

W tym artykule użyto wbudowanego przykładu usługi Cosmos DB utworzonego z nazwą kontenera SampleData.

Pobierz punkt końcowy usługi Cosmos DB

Najpierw uzyskaj punkt końcowy dla bazy danych Cosmos DB w Fabric. Ten punkt końcowy jest wymagany do nawiązania połączenia przy użyciu łącznika Spark usługi Cosmos DB.

  1. Otwórz portal Fabric (https://app.fabric.microsoft.com).

  2. Przejdź do istniejącej bazy danych Cosmos DB.

  3. Wybierz opcję Ustawienia na pasku menu bazy danych.

    Zrzut ekranu przedstawiający opcję paska menu

  4. W oknie dialogowym ustawień przejdź do sekcji Połączenie . Następnie skopiuj wartość pola Punkt końcowy dla bazy danych NoSQL usługi Cosmos DB . Ta wartość jest używana w późniejszym kroku lub krokach.

    Zrzut ekranu przedstawiający sekcję

Instalowanie pakietu zestawu SDK języka Python usługi Cosmos DB

  • Zainstaluj bibliotekę azure-cosmos w notebooku. Powinna to być wersja 4.14.0 lub nowsza.

    Komórka [1]:

    #Install packages
    %pip install azure-cosmos
    

Importowanie bibliotek i ustawianie wartości konfiguracji

  • Zaimportuj pakiety do notesu. W tym i innych przykładach używamy biblioteki asynchronicznej do Cosmos DB. Następnie zastosuj punkt końcowy usługi Cosmos DB, nazwę bazy danych i nazwę kontenera zapisaną w poprzednim kroku.

    Komórka [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Utwórz niestandardowe poświadczenia tokenowe w celu uwierzytelnienia

  1. Utwórz obiekt FabricTokenCredential(), aby utworzyć prawidłowy obiekt poświadczeń dla zestawu SDK usługi Cosmos DB z ciągu tokenu wygenerowanego przez narzędzia poświadczeń Fabric NotebookUtils wymagane do uwierzytelnienia użytkownika.

    [UWAGA!] Notatniki Microsoft Fabric nie obsługują natywnie obiektów poświadczeń Azure. Nie można użyć DefaultAzureCredential() do uwierzytelniania do usługi Cosmos DB w usłudze Microsoft Fabric.

    Komórka [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Utwórz obiekt klienta asynchronicznego usługi Cosmos DB i odwołanie do kontenera usługi Cosmos DB do użycia w notesie.

    Komórka [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Tworzenie funkcji asynchronicznych w celu wykonywania zapytań względem kontenera usługi Cosmos DB

    Komórka [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Wywołaj nowo zdefiniowaną funkcję async, aby zwrócić wyniki zapytania

    Komórka [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output