Share via


Werken met Cosmos DB in een Python-notebook in Microsoft Fabric

U kunt de Cosmos DB Python SDK in een Python-notebook in Microsoft Fabric gebruiken om schrijf- en querygegevens uit Cosmos DB in Microsoft Fabric te lezen en er query's op uit te voeren. U kunt ook Cosmos DB-containers maken en beheren.

Het gebruik van de Spark-connector verschilt van het gebruik van Spark om gegevens te lezen uit de in Fabric gespiegelde gegevens die zijn opgeslagen in OneLake, omdat het rechtstreeks verbinding maakt met het Cosmos DB-eindpunt om bewerkingen uit te voeren.

Aanbeveling

Download het volledige voorbeeld uit Cosmos DB in Microsoft Fabric Samples op GitHub.

Vereiste voorwaarden

Opmerking

In dit artikel wordt het ingebouwde Cosmos DB-voorbeeld gebruikt dat is gemaakt met een containernaam van SampleData.

Cosmos DB-eindpunt ophalen

Haal eerst het eindpunt op voor de Cosmos DB-database in Fabric. Dit eindpunt is vereist om verbinding te maken met behulp van de Cosmos DB Spark-connector.

  1. Open de Fabric-portal (https://app.fabric.microsoft.com).

  2. Navigeer naar uw bestaande Cosmos DB-database.

  3. Selecteer de optie Instellingen in de menubalk voor de database.

    Schermopname van de menubalkoptie Instellingen voor een database in de Fabric-portal.

  4. Navigeer in het dialoogvenster Instellingen naar de sectie Verbinding . Kopieer vervolgens de waarde van het veld Eindpunt voor Cosmos DB NoSQL-database . U gebruikt deze waarde in latere stap[s].

    Schermopname van de sectie Verbinding van het dialoogvenster Instellingen voor een database in de Fabric-portal.

Het Python SDK-pakket van Cosmos DB installeren

  • Installeer het azure-cosmos-pakket in uw notebook. Dit moet versie 4.14.0 of hoger zijn.

    Cel [1]:

    #Install packages
    %pip install azure-cosmos
    

Bibliotheken importeren en configuratiewaarden instellen

  • Importeer de pakketten in uw notebook. In deze en andere voorbeelden gebruiken we de asynchrone bibliotheek voor Cosmos DB. Pas vervolgens het Cosmos DB-eindpunt, de databasenaam en de containernaam toe die u in een vorige stap hebt opgeslagen.

    Cel [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Een aangepaste tokenreferentie maken om te verifiëren

  1. Maak een FabricTokenCredential()-object om een geldig referentieobject voor de Cosmos DB SDK te produceren op basis van de tokentekenreeks die is gegenereerd door de referentiehulpprogramma's van Fabric NotebookUtils die vereist zijn om een gebruiker te verifiëren.

    [OPMERKING!] Microsoft Fabric-notebooks bieden geen systeemeigen ondersteuning voor Azure Credential-objecten. U kunt niet gebruiken DefaultAzureCredential() om te verifiëren bij Cosmos DB in Microsoft Fabric.

    Cel [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Maak een asynchroon Cosmos DB-clientobject en een verwijzing naar de Cosmos DB-container die in een notebook moet worden gebruikt.

    Cel [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Een asynchrone functie maken om een query uit te voeren op de Cosmos DB-container

    Cel [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Roep de zojuist gedefinieerde asynchrone functie aan om de resultaten van de query te retourneren

    Cel [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output