Del via


Arbejd med Cosmos DB i en Python-notebook i Microsoft Fabric

Du kan bruge Cosmos DB Python SDK i en Python-notebook i Microsoft Fabric til at læse, skrive og forespørge data fra Cosmos DB i Microsoft Fabric. Du kan også oprette og administrere Cosmos DB-containere.

Brugen af Spark-connector adskiller sig fra at bruge Spark til at læse data fra Cosmos DB i Fabric-spejlede data, der er gemt i OneLake, da den forbinder direkte til Cosmos DB-endpointet for at udføre operationer.

Tips

Download det komplette eksempel fra Cosmos DB i Microsoft Fabric Samples på GitHub.

Forudsætninger

Notat

Denne artikel bruger det indbyggede Cosmos DB-eksempel oprettet med containernavnet SampleData.

Hent Cosmos DB-slutpunkt

Først skal du hente slutpunktet for Cosmos DB-databasen i Fabric. Dette endepunkt er nødvendigt for at kunne forbinde via Cosmos DB Spark Connector.

  1. Åbn Fabric-portalen (https://app.fabric.microsoft.com).

  2. Gå til din eksisterende Cosmos DB-database.

  3. Vælg indstillingen Indstillinger i menulinjen for databasen.

    Skærmbillede af menulinjeindstillingen 'Indstillinger' for en database på Fabric-portalen.

  4. I indstillingsdialogen skal du navigere til afsnittet Forbindelse . Kopiér derefter værdien af slutpunktet for Cosmos DB NoSQL-databasefeltet . Du bruger denne værdi i senere trin[s].

    Skærmbillede af afsnittet 'Forbindelse' i dialogboksen 'Indstillinger' for en database på Fabric-portalen.

Installer Cosmos DB Python SDK-pakken

  • Installer azure-cosmos-pakken i din notebook. Dette burde være version 4.14.0 eller nyere.

    Celle [1]:

    #Install packages
    %pip install azure-cosmos
    

Importbiblioteker og sæt konfigurationsværdier

  • Importer pakkerne i din notesbog. I dette og andre eksempler bruger vi det asynkrone bibliotek til Cosmos DB. Derefter anvender du Cosmos DB-endpointet, databasenavnet og containernavnet, som du har gemt i et tidligere trin.

    Celle [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Opret en brugerdefineret token-legitimation til autentificering

  1. Opret et FabricTokenCredential()-objekt for at producere et gyldigt legitimationsobjekt til Cosmos DB SDK ud fra tokenstrengen genereret af Fabric NotebookUtils-legitimationsværktøjerne , som er nødvendige for at autentificere en bruger.

    [BEMÆRK!] Microsoft Fabric-notebooks understøtter ikke Azure Credential-objekter indbygget. Du kan ikke bruge DefaultAzureCredential() den til at autentificere til Cosmos DB i Microsoft Fabric.

    Celle [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Opret et asynkront Cosmos DB-klientobjekt og en reference til Cosmos DB-containeren til brug i en notesbog.

    Celle [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Opret en asynkron funktion til at forespørge Cosmos DB-containeren

    Celle [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Kald den nyligt definerede asynkrone funktion for at returnere resultaterne af forespørgslen

    Celle [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output