Del via


Arbeid med Cosmos DB i en Python-notebook i Microsoft Fabric

Du kan bruke Cosmos DB Python SDK i en Python-notatbok i Microsoft Fabric for å lese, skrive og spørre data fra Cosmos DB i Microsoft Fabric. Du kan også opprette og administrere Cosmos DB-containere.

Å bruke Spark connector er annerledes enn å bruke Spark til å lese data fra Cosmos DB i Fabric speilet data lagret i OneLake, siden det kobles direkte til Cosmos DB-endepunktet for å utføre operasjoner.

Tips

Last ned det komplette eksempelet fra Cosmos DB i Microsoft Fabric Samples på GitHub.

Forutsetninger

Note

Denne artikkelen bruker det innebygde Cosmos DB-eksempelet laget med containernavnet SampleData.

Hent Cosmos DB-endepunkt

Først, hent endepunktet for Cosmos DB-databasen i Fabric. Dette endepunktet kreves for å koble til via Cosmos DB Spark Connector.

  1. Åpne Stoffportalen (https://app.fabric.microsoft.com).

  2. Gå til den eksisterende Cosmos DB-databasen.

  3. Velg innstillingsvalget i menylinjen for databasen.

    Skjermbilde av menylinjen 'Innstillinger' for en database i Fabric-portalen.

  4. I innstillingsdialogen, naviger til Tilkoblingsseksjonen. Deretter kopierer du verdien i Endpoint for Cosmos DB NoSQL-databasefeltet . Du bruker denne verdien i senere trinn.

    Skjermbilde av 'Connection'-seksjonen i 'Innstillinger'-dialogen for en database i Fabric-portalen.

Installer Cosmos DB Python SDK-pakken

  • Installer Azure-Cosmos-pakken i notatboken din. Dette bør være versjon 4.14.0 eller nyere.

    Celle [1]:

    #Install packages
    %pip install azure-cosmos
    

Importer biblioteker og sett konfigurasjonsverdier

  • Importer pakkene til notatboken din. I dette og andre eksempler bruker vi det asynkrone biblioteket for Cosmos DB. Deretter anvender du Cosmos DB-endepunktet, databasenavnet og containernavnet du lagret i et tidligere steg.

    Celle [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Opprett en egendefinert token-legitimasjon for autentisering

  1. Lag et FabricTokenCredential()-objekt for å produsere et gyldig legitimasjonsobjekt for Cosmos DB SDK fra tokenstrengen generert av Fabric NotebookUtils-legitimasjonsverktøyene som kreves for å autentisere en bruker.

    [OBS!] Microsoft Fabric-notatbøker støtter ikke Azure Credential-objekter nativt. Du kan ikke bruke DefaultAzureCredential() den til å autentisere til Cosmos DB i Microsoft Fabric.

    Celle [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Lag et asynkront Cosmos DB-klientobjekt og en referanse til Cosmos DB-containeren for bruk i en notatbok.

    Celle [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Lag en asynkron funksjon for å spørre Cosmos DB-containeren

    Celle [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Kall den nylig definerte asynkrone funksjonen for å returnere resultatene av spørringen

    Celle [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output