Edit

Share via


Work with Cosmos DB in a Python notebook in Microsoft Fabric

You can use the Cosmos DB Python SDK in a Python notebook in Microsoft Fabric to read write and query data from Cosmos DB in Microsoft Fabric. You can also create and manage Cosmos DB containers.

Using Spark connector is different from using Spark to read data from the Cosmos DB in Fabric mirrored data stored in OneLake, as it connects directly to the Cosmos DB endpoint to perform operations.

Tip

Download the complete sample from Cosmos DB in Microsoft Fabric Samples on GitHub.

Prerequisites

Note

This article uses the built-in Cosmos DB sample created with a container name of SampleData.

Retrieve Cosmos DB endpoint

First, get the endpoint for the Cosmos DB database in Fabric. This endpoint is required to connect using the Cosmos DB Spark Connector.

  1. Open the Fabric portal (https://app.fabric.microsoft.com).

  2. Navigate to your existing Cosmos DB database.

  3. Select the Settings option in the menu bar for the database.

    Screenshot of the 'Settings' menu bar option for a database in the Fabric portal.

  4. In the settings dialog, navigate to the Connection section. Then, copy the value of the Endpoint for Cosmos DB NoSQL database field. You use this value in later step[s].

    Screenshot of the 'Connection' section of the 'Settings' dialog for a database in the Fabric portal.

Install the Cosmos DB Python SDK package

  • Install the azure-cosmos package in your notebook. This should be version 4.14.0 or later.

    Cell [1]:

    #Install packages
    %pip install azure-cosmos
    

Import libraries and set configuration values

  • Import the packages into your notebook. In this and other samples we use the async library for Cosmos DB. Then apply the Cosmos DB endpoint, database name and container name you saved in a previous step.

    Cell [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Create a custom token credential to authenticate

  1. Create a FabricTokenCredential() object to produce a valid credential object for the Cosmos DB SDK from the token string generated by the Fabric NotebookUtils credential utilities which is required to authenticate a user.

    [NOTE!] Microsoft Fabric notebooks do not support Azure Credential objects natively. You cannot use DefaultAzureCredential() to authenticate to Cosmos DB in Microsoft Fabric.

    Cell [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Create an async Cosmos DB client object and a reference to the Cosmos DB container to use in a notebook.

    Cell [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Create an async function to query the Cosmos DB container

    Cell [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Call the newly defined async function to return the results of the query

    Cell [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output