Merk
Tilgang til denne siden krever autorisasjon. Du kan prøve å logge på eller endre kataloger.
Tilgang til denne siden krever autorisasjon. Du kan prøve å endre kataloger.
Du kan bruke Cosmos DB Python SDK i en Python-notatbok i Microsoft Fabric for å lese, skrive og spørre data fra Cosmos DB i Microsoft Fabric. Du kan også opprette og administrere Cosmos DB-containere.
Å bruke Spark connector er annerledes enn å bruke Spark til å lese data fra Cosmos DB i Fabric speilet data lagret i OneLake, siden det kobles direkte til Cosmos DB-endepunktet for å utføre operasjoner.
Tips
Last ned det komplette eksempelet fra Cosmos DB i Microsoft Fabric Samples på GitHub.
Forutsetninger
En eksisterende stoffkapasitet
- Hvis du ikke har stoffkapasitet, kan du starte en fabric-prøveversjon.
En eksisterende Cosmos DB-database i Fabric
- Hvis du ikke allerede har en, oppretter du en ny Cosmos DB-database i Fabric.
En eksisterende beholder med data
- Hvis du ikke allerede har en, foreslår vi at du laster inn eksempeldatabeholderen.
Note
Denne artikkelen bruker det innebygde Cosmos DB-eksempelet laget med containernavnet SampleData.
Hent Cosmos DB-endepunkt
Først, hent endepunktet for Cosmos DB-databasen i Fabric. Dette endepunktet kreves for å koble til via Cosmos DB Spark Connector.
Åpne Stoffportalen (https://app.fabric.microsoft.com).
Gå til den eksisterende Cosmos DB-databasen.
Velg innstillingsvalget i menylinjen for databasen.
I innstillingsdialogen, naviger til Tilkoblingsseksjonen. Deretter kopierer du verdien i Endpoint for Cosmos DB NoSQL-databasefeltet . Du bruker denne verdien i senere trinn.
Installer Cosmos DB Python SDK-pakken
Installer Azure-Cosmos-pakken i notatboken din. Dette bør være versjon 4.14.0 eller nyere.
Celle [1]:
#Install packages %pip install azure-cosmos
Importer biblioteker og sett konfigurasjonsverdier
Importer pakkene til notatboken din. I dette og andre eksempler bruker vi det asynkrone biblioteket for Cosmos DB. Deretter anvender du Cosmos DB-endepunktet, databasenavnet og containernavnet du lagret i et tidligere steg.
Celle [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Opprett en egendefinert token-legitimasjon for autentisering
Lag et FabricTokenCredential()-objekt for å produsere et gyldig legitimasjonsobjekt for Cosmos DB SDK fra tokenstrengen generert av Fabric NotebookUtils-legitimasjonsverktøyene som kreves for å autentisere en bruker.
[OBS!] Microsoft Fabric-notatbøker støtter ikke Azure Credential-objekter nativt. Du kan ikke bruke
DefaultAzureCredential()den til å autentisere til Cosmos DB i Microsoft Fabric.Celle [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Lag et asynkront Cosmos DB-klientobjekt og en referanse til Cosmos DB-containeren for bruk i en notatbok.
Celle [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Lag en asynkron funksjon for å spørre Cosmos DB-containeren
Celle [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseKall den nylig definerte asynkrone funksjonen for å returnere resultatene av spørringen
Celle [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output