Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Puede usar el SDK de Python de Cosmos DB en un cuaderno de Python de Microsoft Fabric para leer datos de escritura y consulta de Cosmos DB en Microsoft Fabric. También puede crear y administrar contenedores de Cosmos DB.
El uso del conector Spark es diferente al uso de Spark para leer datos de Cosmos DB en los datos replicados de Fabric almacenados en OneLake, ya que se conecta directamente al endpoint de Cosmos DB para realizar operaciones.
Sugerencia
Descargue el ejemplo completo de Cosmos DB en Ejemplos de Microsoft Fabric en GitHub.
Prerrequisitos
Una existente capacidad de Fabric
- Si no tiene capacidad de Fabric, inicie una versión de prueba de Fabric.
Una base de datos de Cosmos DB existente en Fabric
- Si aún no tiene una, cree una base de datos de Cosmos DB en Fabric.
Un contenedor existente con datos
- Si aún no tiene una, se recomienda cargar el contenedor de datos de ejemplo.
Nota:
En este artículo se usa el ejemplo integrado de Cosmos DB creado con un nombre de contenedor de SampleData.
Recuperación del punto de conexión de Cosmos DB
Primero, obtenga el punto de conexión de la base de datos de Cosmos DB dentro de Fabric. Este punto de conexión es necesario para conectarse mediante el conector spark de Cosmos DB.
Abra el portal de Fabric (https://app.fabric.microsoft.com).
Acceda a su base de datos existente de Cosmos DB.
Seleccione la opción Configuración en la barra de menús de la base de datos.
En el cuadro de diálogo configuración, vaya a la sección Conexión . A continuación, copie el valor del campo Punto de conexión para la base de datos NoSQL de Cosmos DB . Usted utiliza este valor en pasos posteriores.
Instalación del paquete del SDK de Python de Cosmos DB
Instale el paquete azure-cosmos en el cuaderno. Debe ser la versión 4.14.0 o posterior.
Celda [1]:
#Install packages %pip install azure-cosmos
Importar bibliotecas y establecer valores de configuración
Importe los paquetes en el cuaderno. En este y otros ejemplos se usa la biblioteca asincrónica para Cosmos DB. A continuación, aplique el punto de conexión de Cosmos DB, el nombre de la base de datos y el nombre del contenedor que guardó en un paso anterior.
Celda [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
Creación de una credencial de token personalizada para autenticarse
Cree un objeto FabricTokenCredential() para generar un objeto de credencial válido para el SDK de Cosmos DB a partir de la cadena de token generada por las utilidades de credenciales de Fabric NotebookUtils necesarias para autenticar a un usuario.
[¡NOTA!] Los cuadernos de Microsoft Fabric no admiten objetos de credenciales de Azure de forma nativa. No se puede usar
DefaultAzureCredential()para autenticarse en Cosmos DB en Microsoft Fabric.Celda [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)Cree un objeto de cliente asincrónico de Cosmos DB y una referencia al contenedor de Cosmos DB que se usará en un cuaderno.
Celda [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)Creación de una función asincrónica para consultar el contenedor de Cosmos DB
Celda [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseLlame a la función asincrónica recién definida para devolver los resultados de la consulta.
Celda [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output