可以在 Microsoft Fabric 中的 Python 笔记本里使用 Cosmos DB Python SDK 从 Microsoft Fabric 中的 Cosmos DB 读取、写入和查询数据。 还可以创建和管理 Cosmos DB 容器。
使用 Spark 连接器与使用 Spark 从 OneLake 中的 Fabric 镜像数据读取 Cosmos DB 的数据不同,因为它直接连接到 Cosmos DB 终结点以执行操作。
小窍门
在 GitHub 上的 Microsoft Fabric 示例中,从 Cosmos DB 下载完整的示例。
先决条件
现有Fabric容量
- 如果没有 Fabric 容量, 请启动 Fabric 试用版。
Fabric 中的现有 Cosmos DB 数据库
包含数据的现有容器
- 如果您还没有,请加载示例数据容器。
注释
本文使用容器名称为 SampleData 创建的内置 Cosmos DB 示例。
检索 Cosmos DB 终结点
首先,获取 Fabric 中 Cosmos DB 数据库的终结点。 若要使用 Cosmos DB Spark 连接器进行连接,需要使用此终结点。
打开 Fabric 门户(https://app.fabric.microsoft.com)。
导航到现有的 Cosmos DB 数据库。
在数据库的菜单栏中选择 “设置” 选项。
在“设置”对话框中,导航到 “连接” 部分。 然后,复制 Cosmos DB NoSQL 数据库 字段的 Endpoint 值。 在后续步骤[s]中使用此值。
安装 Cosmos DB Python SDK 包
在笔记本中安装 azure-cosmos 包。 这应该是 4.14.0 或更高版本。
单元格 [1]:
#Install packages %pip install azure-cosmos
导入库并设置配置值
将包导入笔记本中。 在此和其他示例中,我们使用 Cosmos DB 的异步库。 然后应用在上一步中保存的 Cosmos DB 终结点、数据库名称和容器名称。
单元格 [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
创建自定义令牌凭据以进行身份验证
创建 FabricTokenCredential() 对象,以便从 Fabric NotebookUtils 凭据实用工具 生成的令牌字符串中为 Cosmos DB SDK 生成有效的凭据对象,这是对用户进行身份验证所必需的。
[注意! Microsoft Fabric 笔记本不原生支持 Azure 凭据对象。
DefaultAzureCredential()不能用于在 Microsoft Fabric 中对 Cosmos DB 进行身份验证。单元格 [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)创建异步 Cosmos DB 客户端对象和对 Cosmos DB 容器的引用,以在笔记本中使用。
单元格 [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)创建异步函数以查询 Cosmos DB 容器
单元格 [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raise调用新定义的异步函数以返回查询的结果
单元格 [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output