Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
Du kan bruge Cosmos DB Python SDK i en Python-notebook i Microsoft Fabric til at læse, skrive og forespørge data fra Cosmos DB i Microsoft Fabric. Du kan også oprette og administrere Cosmos DB-containere.
Brugen af Spark-connector adskiller sig fra at bruge Spark til at læse data fra Cosmos DB i Fabric-spejlede data, der er gemt i OneLake, da den forbinder direkte til Cosmos DB-endpointet for at udføre operationer.
Tips
Download det komplette eksempel fra Cosmos DB i Microsoft Fabric Samples på GitHub.
Forudsætninger
En eksisterende Fabric-kapacitet
- Hvis du ikke har Fabric-kapacitet, kan du starte en Fabric-prøveversion.
En eksisterende Cosmos DB-database i Fabric
- Hvis du ikke allerede har en, kan du oprette en ny Cosmos DB-database i Fabric.
En eksisterende objektbeholder med data
- Hvis du ikke allerede har en, foreslår vi, at du indlæser eksempeldataobjektbeholderen.
Notat
Denne artikel bruger det indbyggede Cosmos DB-eksempel oprettet med containernavnet SampleData.
Hent Cosmos DB-slutpunkt
Først skal du hente slutpunktet for Cosmos DB-databasen i Fabric. Dette endepunkt er nødvendigt for at kunne forbinde via Cosmos DB Spark Connector.
Åbn Fabric-portalen (https://app.fabric.microsoft.com).
Gå til din eksisterende Cosmos DB-database.
Vælg indstillingen Indstillinger i menulinjen for databasen.
I indstillingsdialogen skal du navigere til afsnittet Forbindelse . Kopiér derefter værdien af slutpunktet for Cosmos DB NoSQL-databasefeltet . Du bruger denne værdi i senere trin[s].
Installer Cosmos DB Python SDK-pakken
Installer azure-cosmos-pakken i din notebook. Dette burde være version 4.14.0 eller nyere.
Celle [1]:
#Install packages %pip install azure-cosmos
Importbiblioteker og sæt konfigurationsværdier
Importer pakkerne i din notesbog. I dette og andre eksempler bruger vi det asynkrone bibliotek til Cosmos DB. Derefter anvender du Cosmos DB-endpointet, databasenavnet og containernavnet, som du har gemt i et tidligere trin.
Celle [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Opret en brugerdefineret token-legitimation til autentificering
Opret et FabricTokenCredential()-objekt for at producere et gyldigt legitimationsobjekt til Cosmos DB SDK ud fra tokenstrengen genereret af Fabric NotebookUtils-legitimationsværktøjerne , som er nødvendige for at autentificere en bruger.
[BEMÆRK!] Microsoft Fabric-notebooks understøtter ikke Azure Credential-objekter indbygget. Du kan ikke bruge
DefaultAzureCredential()den til at autentificere til Cosmos DB i Microsoft Fabric.Celle [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Opret et asynkront Cosmos DB-klientobjekt og en reference til Cosmos DB-containeren til brug i en notesbog.
Celle [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Opret en asynkron funktion til at forespørge Cosmos DB-containeren
Celle [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseKald den nyligt definerede asynkrone funktion for at returnere resultaterne af forespørgslen
Celle [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output