Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Zestaw SDK języka Python usługi Cosmos DB można używać w notesie języka Python w usłudze Microsoft Fabric do odczytywania, zapisywania i wykonywania zapytań dotyczących danych z usługi Cosmos DB w Microsoft Fabric. Można również tworzyć kontenery usługi Cosmos DB i zarządzać nimi.
Używanie łącznika Spark różni się od używania Spark do odczytywania danych z Cosmos DB w Fabric z dublowanymi danymi przechowywanymi w OneLake, ponieważ łączy się bezpośrednio z punktem końcowym Cosmos DB w celu wykonywania operacji.
Wskazówka
Pobierz kompletny przykład z usługi Cosmos DB w przykładach usługi Microsoft Fabric w witrynie GitHub.
Wymagania wstępne
Istniejąca zdolność sieci Fabric
- Jeśli nie masz pojemności Fabric, uruchom wersję próbną usługi Fabric.
Istniejąca baza danych Cosmos DB na platformie Fabric
- Jeśli jeszcze go nie masz, utwórz nową bazę danych Cosmos DB w usłudze Fabric.
Istniejący kontener z danymi
- Jeśli jeszcze go nie masz, sugerujemy załadowanie przykładowego kontenera danych.
Uwaga / Notatka
W tym artykule użyto wbudowanego przykładu usługi Cosmos DB utworzonego z nazwą kontenera SampleData.
Pobierz punkt końcowy usługi Cosmos DB
Najpierw uzyskaj punkt końcowy dla bazy danych Cosmos DB w Fabric. Ten punkt końcowy jest wymagany do nawiązania połączenia przy użyciu łącznika Spark usługi Cosmos DB.
Otwórz portal Fabric (https://app.fabric.microsoft.com).
Przejdź do istniejącej bazy danych Cosmos DB.
Wybierz opcję Ustawienia na pasku menu bazy danych.
W oknie dialogowym ustawień przejdź do sekcji Połączenie . Następnie skopiuj wartość pola Punkt końcowy dla bazy danych NoSQL usługi Cosmos DB . Ta wartość jest używana w późniejszym kroku lub krokach.
Instalowanie pakietu zestawu SDK języka Python usługi Cosmos DB
Zainstaluj bibliotekę azure-cosmos w notebooku. Powinna to być wersja 4.14.0 lub nowsza.
Komórka [1]:
#Install packages %pip install azure-cosmos
Importowanie bibliotek i ustawianie wartości konfiguracji
Zaimportuj pakiety do notesu. W tym i innych przykładach używamy biblioteki asynchronicznej do Cosmos DB. Następnie zastosuj punkt końcowy usługi Cosmos DB, nazwę bazy danych i nazwę kontenera zapisaną w poprzednim kroku.
Komórka [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Utwórz niestandardowe poświadczenia tokenowe w celu uwierzytelnienia
Utwórz obiekt FabricTokenCredential(), aby utworzyć prawidłowy obiekt poświadczeń dla zestawu SDK usługi Cosmos DB z ciągu tokenu wygenerowanego przez narzędzia poświadczeń Fabric NotebookUtils wymagane do uwierzytelnienia użytkownika.
[UWAGA!] Notatniki Microsoft Fabric nie obsługują natywnie obiektów poświadczeń Azure. Nie można użyć
DefaultAzureCredential()do uwierzytelniania do usługi Cosmos DB w usłudze Microsoft Fabric.Komórka [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Utwórz obiekt klienta asynchronicznego usługi Cosmos DB i odwołanie do kontenera usługi Cosmos DB do użycia w notesie.
Komórka [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Tworzenie funkcji asynchronicznych w celu wykonywania zapytań względem kontenera usługi Cosmos DB
Komórka [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseWywołaj nowo zdefiniowaną funkcję async, aby zwrócić wyniki zapytania
Komórka [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output