Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Sie können das Cosmos DB Python SDK in einem Python-Notizbuch in Microsoft Fabric verwenden, um Schreib- und Abfragedaten aus Cosmos DB in Microsoft Fabric zu lesen. Sie können auch Cosmos DB-Container erstellen und verwalten.
Die Verwendung des Spark-Connectors unterscheidet sich von der Verwendung von Spark, um Daten aus gespiegelten Fabric-Daten in der Cosmos DB zu lesen, die in OneLake gespeichert sind, da er eine direkte Verbindung zum Cosmos DB-Endpunkt herstellt, um Vorgänge auszuführen.
Tipp
Laden Sie das vollständige Beispiel von Cosmos DB in Microsoft Fabric Samples auf GitHub herunter.
Voraussetzungen
Bestehende Fabric-Kapazität
- Wenn Sie über keine Fabric-Kapazität verfügen, starten Sie eine Fabric-Testversion.
Eine vorhandene Cosmos DB-Datenbank in Fabric
- Wenn Sie noch keine Datenbank besitzen, erstellen Sie eine neue Cosmos DB-Datenbank in Fabric.
Ein vorhandener Container mit Daten
- Wenn Sie noch keins haben, empfehlen wir, den Beispieldatencontainer zu laden.
Hinweis
In diesem Artikel wird das integrierte Cosmos DB-Beispiel verwendet, das mit einem Containernamen von SampleData erstellt wurde.
Abrufen des Cosmos DB-Endpunkts
Rufen Sie zuerst den Endpunkt für die Cosmos DB-Datenbank in Fabric ab. Dieser Endpunkt ist erforderlich, um eine Verbindung mit dem Cosmos DB Spark Connector herzustellen.
Öffnen Sie das Fabric-Portal (https://app.fabric.microsoft.com).
Navigieren Sie zu Ihrer vorhandenen Cosmos DB-Datenbank.
Wählen Sie die Einstellungsoption in der Menüleiste für die Datenbank aus.
Navigieren Sie im Dialogfeld "Einstellungen" zum Abschnitt "Verbindung" . Kopieren Sie dann den Wert des Endpoints für die Cosmos DB NoSQL-Datenbank. Sie verwenden diesen Wert in einem späteren Schritt[s].
Installieren des Cosmos DB Python SDK-Pakets
Installieren Sie das Azure-Cosmos-Paket in Ihrem Notizbuch. Dies sollte Version 4.14.0 oder höher sein.
Zelle [1]:
#Install packages %pip install azure-cosmos
Importieren von Bibliotheken und Festlegen von Konfigurationswerten
Importieren Sie die Pakete in Ihr Notizbuch. In diesem und anderen Beispielen verwenden wir die asynchrone Bibliothek für Cosmos DB. Wenden Sie dann den Cosmos DB-Endpunkt, den Datenbanknamen und den Containernamen an, den Sie in einem vorherigen Schritt gespeichert haben.
Zelle [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Erstellen benutzerdefinierter Token-Zugangsdaten zur Authentifizierung
Erstellen Sie ein FabricTokenCredential()-Objekt, um ein gültiges Anmeldeinformationsobjekt für das Cosmos DB SDK aus der Tokenzeichenfolge zu erzeugen, die von den Fabric NotebookUtils-Anmeldeinformationsprogrammen generiert wird, die zum Authentifizieren eines Benutzers erforderlich sind.
[HINWEIS!] Microsoft Fabric-Notizbücher unterstützen keine nativen Azure-Anmeldeinformationen-Objekte. Sie können sich nicht mit
DefaultAzureCredential()bei Cosmos DB innerhalb von Microsoft Fabric authentifizieren.Zelle [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Erstellen Sie ein asynchrones Cosmos DB-Clientobjekt und einen Verweis auf den Cosmos DB-Container, der in einem Notizbuch verwendet werden soll.
Zelle [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Erstellen einer asynchronen Funktion zum Abfragen des Cosmos DB-Containers
Zelle [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseRufen Sie die neu definierte asynchrone Funktion auf, um die Ergebnisse der Abfrage zurückzugeben.
Zelle [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output