Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
U kunt de Cosmos DB Python SDK in een Python-notebook in Microsoft Fabric gebruiken om schrijf- en querygegevens uit Cosmos DB in Microsoft Fabric te lezen en er query's op uit te voeren. U kunt ook Cosmos DB-containers maken en beheren.
Het gebruik van de Spark-connector verschilt van het gebruik van Spark om gegevens te lezen uit de in Fabric gespiegelde gegevens die zijn opgeslagen in OneLake, omdat het rechtstreeks verbinding maakt met het Cosmos DB-eindpunt om bewerkingen uit te voeren.
Aanbeveling
Download het volledige voorbeeld uit Cosmos DB in Microsoft Fabric Samples op GitHub.
Vereiste voorwaarden
Een bestaande Fabric-capaciteit
- Als je geen Fabric-capaciteit hebt, begin dan een proefversie van Fabric.
Een bestaande Cosmos DB-database in Fabric
- Als u er nog geen hebt, maakt u een nieuwe Cosmos DB-database in Fabric.
Een bestaande container met gegevens
- Als u er nog geen hebt, raden we u aan om de container met voorbeeldgegevens te laden.
Opmerking
In dit artikel wordt het ingebouwde Cosmos DB-voorbeeld gebruikt dat is gemaakt met een containernaam van SampleData.
Cosmos DB-eindpunt ophalen
Haal eerst het eindpunt op voor de Cosmos DB-database in Fabric. Dit eindpunt is vereist om verbinding te maken met behulp van de Cosmos DB Spark-connector.
Open de Fabric-portal (https://app.fabric.microsoft.com).
Navigeer naar uw bestaande Cosmos DB-database.
Selecteer de optie Instellingen in de menubalk voor de database.
Navigeer in het dialoogvenster Instellingen naar de sectie Verbinding . Kopieer vervolgens de waarde van het veld Eindpunt voor Cosmos DB NoSQL-database . U gebruikt deze waarde in latere stap[s].
Het Python SDK-pakket van Cosmos DB installeren
Installeer het azure-cosmos-pakket in uw notebook. Dit moet versie 4.14.0 of hoger zijn.
Cel [1]:
#Install packages %pip install azure-cosmos
Bibliotheken importeren en configuratiewaarden instellen
Importeer de pakketten in uw notebook. In deze en andere voorbeelden gebruiken we de asynchrone bibliotheek voor Cosmos DB. Pas vervolgens het Cosmos DB-eindpunt, de databasenaam en de containernaam toe die u in een vorige stap hebt opgeslagen.
Cel [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Een aangepaste tokenreferentie maken om te verifiëren
Maak een FabricTokenCredential()-object om een geldig referentieobject voor de Cosmos DB SDK te produceren op basis van de tokentekenreeks die is gegenereerd door de referentiehulpprogramma's van Fabric NotebookUtils die vereist zijn om een gebruiker te verifiëren.
[OPMERKING!] Microsoft Fabric-notebooks bieden geen systeemeigen ondersteuning voor Azure Credential-objecten. U kunt niet gebruiken
DefaultAzureCredential()om te verifiëren bij Cosmos DB in Microsoft Fabric.Cel [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Maak een asynchroon Cosmos DB-clientobject en een verwijzing naar de Cosmos DB-container die in een notebook moet worden gebruikt.
Cel [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Een asynchrone functie maken om een query uit te voeren op de Cosmos DB-container
Cel [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseRoep de zojuist gedefinieerde asynchrone functie aan om de resultaten van de query te retourneren
Cel [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output