Condividi tramite


Lavorare con Cosmos DB in un notebook Python in Microsoft Fabric

È possibile usare il Cosmos DB Python SDK in un notebook Python all'interno di Microsoft Fabric per leggere ed eseguire query sui dati da Cosmos DB in Microsoft Fabric. È anche possibile creare e gestire contenitori Cosmos DB.

L'uso del connettore Spark è diverso dall'uso di Spark per leggere i dati di Cosmos DB in Fabric con dati in mirroring archiviati in OneLake, perché si connette direttamente all'endpoint di Cosmos DB per eseguire operazioni.

Suggerimento

Scarica l'esempio completo da Cosmos DB in Microsoft Fabric Samples su GitHub.

Prerequisiti

Annotazioni

Questo articolo usa l'esempio predefinito di Cosmos DB creato con un nome contenitore SampleData.

Recuperare l'endpoint di Cosmos DB

Ottenere prima di tutto l'endpoint per il database Cosmos DB in Fabric. Questo endpoint è necessario per connettersi usando il connettore Spark di Cosmos DB.

  1. Apri il portale Fabric (https://app.fabric.microsoft.com).

  2. Navigare al database Cosmos DB esistente.

  3. Selezionare l'opzione Impostazioni nella barra dei menu per il database.

    Screenshot dell'opzione della barra dei menu

  4. Nella finestra di dialogo impostazioni passare alla sezione Connessione . Copia poi il valore del campo Endpoint del database NoSQL per Cosmos DB. Questo valore viene usato nei passaggi successivi.

    Screenshot della sezione

Installare il pacchetto Python SDK di Cosmos DB

  • Installare il pacchetto azure-cosmos nel notebook. Deve essere la versione 4.14.0 o successiva.

    Cella [1]:

    #Install packages
    %pip install azure-cosmos
    

Importare librerie e impostare i valori di configurazione

  • Importare i pacchetti nel notebook. In questo e in altri esempi viene usata la libreria asincrona per Cosmos DB. Applicare quindi l'endpoint Cosmos DB, il nome del database e il nome del contenitore salvati in un passaggio precedente.

    Cella [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Creare una credenziale di token personalizzata per l'autenticazione

  1. Creare un oggetto FabricTokenCredential() per produrre un oggetto credenziale valido per Cosmos DB SDK dalla stringa di token generata dalle utilità di credenziali NotebookUtils di Fabric necessarie per autenticare un utente.

    [NOTE!] I notebook di Microsoft Fabric non supportano gli oggetti Credential di Azure in modo nativo. Non è possibile usare DefaultAzureCredential() per eseguire l'autenticazione a Cosmos DB in Microsoft Fabric.

    Cella [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Creare un oggetto client Cosmos DB asincrono e un riferimento al contenitore Cosmos DB da utilizzare in un notebook.

    Cella [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Creare una funzione asincrona per eseguire query sul contenitore Cosmos DB

    Cella [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Chiamare la funzione asincrona appena definita per restituire i risultati della query

    Cella [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output