Compartir a través de


Trabajar con Cosmos DB en un cuaderno de Python en Microsoft Fabric

Puede usar el SDK de Python de Cosmos DB en un cuaderno de Python de Microsoft Fabric para leer datos de escritura y consulta de Cosmos DB en Microsoft Fabric. También puede crear y administrar contenedores de Cosmos DB.

El uso del conector Spark es diferente al uso de Spark para leer datos de Cosmos DB en los datos replicados de Fabric almacenados en OneLake, ya que se conecta directamente al endpoint de Cosmos DB para realizar operaciones.

Sugerencia

Descargue el ejemplo completo de Cosmos DB en Ejemplos de Microsoft Fabric en GitHub.

Prerrequisitos

Nota:

En este artículo se usa el ejemplo integrado de Cosmos DB creado con un nombre de contenedor de SampleData.

Recuperación del punto de conexión de Cosmos DB

Primero, obtenga el punto de conexión de la base de datos de Cosmos DB dentro de Fabric. Este punto de conexión es necesario para conectarse mediante el conector spark de Cosmos DB.

  1. Abra el portal de Fabric (https://app.fabric.microsoft.com).

  2. Acceda a su base de datos existente de Cosmos DB.

  3. Seleccione la opción Configuración en la barra de menús de la base de datos.

    Captura de pantalla de la opción de barra de menús

  4. En el cuadro de diálogo configuración, vaya a la sección Conexión . A continuación, copie el valor del campo Punto de conexión para la base de datos NoSQL de Cosmos DB . Usted utiliza este valor en pasos posteriores.

    Captura de pantalla de la sección

Instalación del paquete del SDK de Python de Cosmos DB

  • Instale el paquete azure-cosmos en el cuaderno. Debe ser la versión 4.14.0 o posterior.

    Celda [1]:

    #Install packages
    %pip install azure-cosmos
    

Importar bibliotecas y establecer valores de configuración

  • Importe los paquetes en el cuaderno. En este y otros ejemplos se usa la biblioteca asincrónica para Cosmos DB. A continuación, aplique el punto de conexión de Cosmos DB, el nombre de la base de datos y el nombre del contenedor que guardó en un paso anterior.

    Celda [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Creación de una credencial de token personalizada para autenticarse

  1. Cree un objeto FabricTokenCredential() para generar un objeto de credencial válido para el SDK de Cosmos DB a partir de la cadena de token generada por las utilidades de credenciales de Fabric NotebookUtils necesarias para autenticar a un usuario.

    [¡NOTA!] Los cuadernos de Microsoft Fabric no admiten objetos de credenciales de Azure de forma nativa. No se puede usar DefaultAzureCredential() para autenticarse en Cosmos DB en Microsoft Fabric.

    Celda [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Cree un objeto de cliente asincrónico de Cosmos DB y una referencia al contenedor de Cosmos DB que se usará en un cuaderno.

    Celda [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Creación de una función asincrónica para consultar el contenedor de Cosmos DB

    Celda [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Llame a la función asincrónica recién definida para devolver los resultados de la consulta.

    Celda [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output