Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
È possibile usare il Cosmos DB Python SDK in un notebook Python all'interno di Microsoft Fabric per leggere ed eseguire query sui dati da Cosmos DB in Microsoft Fabric. È anche possibile creare e gestire contenitori Cosmos DB.
L'uso del connettore Spark è diverso dall'uso di Spark per leggere i dati di Cosmos DB in Fabric con dati in mirroring archiviati in OneLake, perché si connette direttamente all'endpoint di Cosmos DB per eseguire operazioni.
Suggerimento
Scarica l'esempio completo da Cosmos DB in Microsoft Fabric Samples su GitHub.
Prerequisiti
Una capacità di Fabric esistente
- Se non si ha capacità di Fabric, attivare una versione di prova di Fabric.
Un database Cosmos DB esistente in Fabric
- Se non ne è già disponibile uno, creare un nuovo database Cosmos DB in Fabric.
Contenitore esistente con dati
- Se non è già disponibile, è consigliabile caricare il contenitore di dati di esempio.
Annotazioni
Questo articolo usa l'esempio predefinito di Cosmos DB creato con un nome contenitore SampleData.
Recuperare l'endpoint di Cosmos DB
Ottenere prima di tutto l'endpoint per il database Cosmos DB in Fabric. Questo endpoint è necessario per connettersi usando il connettore Spark di Cosmos DB.
Apri il portale Fabric (https://app.fabric.microsoft.com).
Navigare al database Cosmos DB esistente.
Selezionare l'opzione Impostazioni nella barra dei menu per il database.
Nella finestra di dialogo impostazioni passare alla sezione Connessione . Copia poi il valore del campo Endpoint del database NoSQL per Cosmos DB. Questo valore viene usato nei passaggi successivi.
Installare il pacchetto Python SDK di Cosmos DB
Installare il pacchetto azure-cosmos nel notebook. Deve essere la versione 4.14.0 o successiva.
Cella [1]:
#Install packages %pip install azure-cosmos
Importare librerie e impostare i valori di configurazione
Importare i pacchetti nel notebook. In questo e in altri esempi viene usata la libreria asincrona per Cosmos DB. Applicare quindi l'endpoint Cosmos DB, il nome del database e il nome del contenitore salvati in un passaggio precedente.
Cella [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Creare una credenziale di token personalizzata per l'autenticazione
Creare un oggetto FabricTokenCredential() per produrre un oggetto credenziale valido per Cosmos DB SDK dalla stringa di token generata dalle utilità di credenziali NotebookUtils di Fabric necessarie per autenticare un utente.
[NOTE!] I notebook di Microsoft Fabric non supportano gli oggetti Credential di Azure in modo nativo. Non è possibile usare
DefaultAzureCredential()per eseguire l'autenticazione a Cosmos DB in Microsoft Fabric.Cella [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Creare un oggetto client Cosmos DB asincrono e un riferimento al contenitore Cosmos DB da utilizzare in un notebook.
Cella [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Creare una funzione asincrona per eseguire query sul contenitore Cosmos DB
Cella [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseChiamare la funzione asincrona appena definita per restituire i risultati della query
Cella [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output