Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Sadu Python SDK služby Cosmos DB můžete použít v poznámkovém bloku Pythonu v Microsoft Fabric ke čtení zápisu a dotazování dat ze služby Cosmos DB v Microsoft Fabric. Můžete také vytvářet a spravovat kontejnery Cosmos DB.
Použití konektoru Spark se liší od použití Sparku ke čtení dat z Cosmos DB, které jsou v rámci Fabric zrcadleně uloženy v OneLake, protože se připojuje přímo ke koncovému bodu Cosmos DB, aby prováděl operace.
Návod
Stáhněte si kompletní ukázku ze služby Cosmos DB v ukázkách Microsoft Fabric na GitHubu.
Požadavky
Existující kapacita Fabric
- Pokud kapacitu Fabric nemáte, spusťte zkušební verzi Fabric.
Existující databáze Cosmos DB v systému Fabric
- Pokud ji ještě nemáte, vytvořte novou databázi Cosmos DB ve Fabric.
Existující kontejner s daty
- Pokud ho ještě nemáte, doporučujeme načíst ukázkový kontejner dat.
Poznámka:
Tento článek používá integrovanou ukázku Cosmos DB vytvořenou s názvem kontejneru SampleData.
Načtení koncového bodu Cosmos DB
Nejprve najděte koncový bod pro databázi Cosmos DB ve Fabricu. Tento koncový bod se vyžaduje pro připojení pomocí konektoru Spark služby Cosmos DB.
Otevřete portál Fabric (https://app.fabric.microsoft.com).
Přejděte do existující databáze Cosmos DB.
V řádku nabídek databáze vyberte možnost Nastavení .
V dialogovém okně nastavení přejděte do části Připojení . Pak zkopírujte hodnotu koncového bodu pro databázi NoSQL služby Cosmos DB. Tuto hodnotu použijete v pozdějších krocích.
Instalace balíčku Python SDK služby Cosmos DB
Nainstalujte balíček azure-cosmos do poznámkového bloku. Mělo by to být verze 4.14.0 nebo novější.
Buňka [1]:
#Install packages %pip install azure-cosmos
Import knihoven a nastavení konfiguračních hodnot
Importujte balíčky do poznámkového bloku. V této a dalších ukázkách používáme asynchronní knihovnu pro Cosmos DB. Pak použijte koncový bod služby Cosmos DB, název databáze a název kontejneru, které jste uložili v předchozím kroku.
Buňka [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Vytvořte vlastní přihlašovací token pro ověření
Vytvořte objekt FabricTokenCredential(), který vytvoří platný objekt přihlašovacích údajů pro sadu SDK služby Cosmos DB z řetězce tokenu vygenerovaného nástroji pro přihlašovací údaje Fabric NotebookUtils , které se vyžadují k ověření uživatele.
[POZNÁMKA!] Poznámkové bloky Microsoft Fabric nativně nepodporují objekty přihlašovacích údajů Azure. K ověření ve službě Cosmos DB v Microsoft Fabric se nedá použít
DefaultAzureCredential().Buňka [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Vytvořte asynchronní objekt klienta Cosmos DB a odkaz na kontejner Cosmos DB, který se má použít v poznámkovém bloku.
Buňka [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Vytvoření asynchronní funkce pro dotazování kontejneru Cosmos DB
Buňka [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseVoláním nově definované asynchronní funkce vrátíte výsledky dotazu.
Buňka [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output