Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
You can use the Cosmos DB Python SDK in a Python notebook in Microsoft Fabric to read write and query data from Cosmos DB in Microsoft Fabric. You can also create and manage Cosmos DB containers.
Using Spark connector is different from using Spark to read data from the Cosmos DB in Fabric mirrored data stored in OneLake, as it connects directly to the Cosmos DB endpoint to perform operations.
Tip
Download the complete sample from Cosmos DB in Microsoft Fabric Samples on GitHub.
Prerequisites
An existing Fabric capacity
- If you don't have Fabric capacity, start a Fabric trial.
An existing Cosmos DB database in Fabric
- If you don't have one already, create a new Cosmos DB database in Fabric.
An existing container with data
- If you don't have one already, we suggest that you load the sample data container.
Note
This article uses the built-in Cosmos DB sample created with a container name of SampleData.
Retrieve Cosmos DB endpoint
First, get the endpoint for the Cosmos DB database in Fabric. This endpoint is required to connect using the Cosmos DB Spark Connector.
Open the Fabric portal (https://app.fabric.microsoft.com).
Navigate to your existing Cosmos DB database.
Select the Settings option in the menu bar for the database.
In the settings dialog, navigate to the Connection section. Then, copy the value of the Endpoint for Cosmos DB NoSQL database field. You use this value in later step[s].
Install the Cosmos DB Python SDK package
Install the azure-cosmos package in your notebook. This should be version 4.14.0 or later.
Cell [1]:
#Install packages %pip install azure-cosmos
Import libraries and set configuration values
Import the packages into your notebook. In this and other samples we use the async library for Cosmos DB. Then apply the Cosmos DB endpoint, database name and container name you saved in a previous step.
Cell [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Create a custom token credential to authenticate
Create a FabricTokenCredential() object to produce a valid credential object for the Cosmos DB SDK from the token string generated by the Fabric NotebookUtils credential utilities which is required to authenticate a user.
[NOTE!] Microsoft Fabric notebooks do not support Azure Credential objects natively. You cannot use
DefaultAzureCredential()to authenticate to Cosmos DB in Microsoft Fabric.Cell [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Create an async Cosmos DB client object and a reference to the Cosmos DB container to use in a notebook.
Cell [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Create an async function to query the Cosmos DB container
Cell [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseCall the newly defined async function to return the results of the query
Cell [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output