在 Microsoft Fabric 的 Python 笔记本中使用 Cosmos DB

可以在 Microsoft Fabric 中的 Python 笔记本里使用 Cosmos DB Python SDK 从 Microsoft Fabric 中的 Cosmos DB 读取、写入和查询数据。 还可以创建和管理 Cosmos DB 容器。

使用 Spark 连接器与使用 Spark 从 OneLake 中的 Fabric 镜像数据读取 Cosmos DB 的数据不同,因为它直接连接到 Cosmos DB 终结点以执行操作。

小窍门

GitHub 上的 Microsoft Fabric 示例中,从 Cosmos DB 下载完整的示例。

先决条件

  • 包含数据的现有容器

注释

本文使用容器名称为 SampleData 创建的内置 Cosmos DB 示例。

检索 Cosmos DB 终结点

首先,获取 Fabric 中 Cosmos DB 数据库的终结点。 若要使用 Cosmos DB Spark 连接器进行连接,需要使用此终结点。

  1. 打开 Fabric 门户(https://app.fabric.microsoft.com)。

  2. 导航到现有的 Cosmos DB 数据库。

  3. 在数据库的菜单栏中选择 “设置” 选项。

    Fabric 门户中数据库的“设置”菜单栏选项的屏幕截图。

  4. 在“设置”对话框中,导航到 “连接” 部分。 然后,复制 Cosmos DB NoSQL 数据库 字段的 Endpoint 值。 在后续步骤[s]中使用此值。

    Fabric 门户中数据库的“设置”对话框的“连接”部分的屏幕截图。

安装 Cosmos DB Python SDK 包

  • 在笔记本中安装 azure-cosmos 包。 这应该是 4.14.0 或更高版本。

    单元格 [1]:

    #Install packages
    %pip install azure-cosmos
    

导入库并设置配置值

  • 将包导入笔记本中。 在此和其他示例中,我们使用 Cosmos DB 的异步库。 然后应用在上一步中保存的 Cosmos DB 终结点、数据库名称和容器名称。

    单元格 [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

创建自定义令牌凭据以进行身份验证

  1. 创建 FabricTokenCredential() 对象,以便从 Fabric NotebookUtils 凭据实用工具 生成的令牌字符串中为 Cosmos DB SDK 生成有效的凭据对象,这是对用户进行身份验证所必需的。

    [注意! Microsoft Fabric 笔记本不原生支持 Azure 凭据对象。 DefaultAzureCredential() 不能用于在 Microsoft Fabric 中对 Cosmos DB 进行身份验证。

    单元格 [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. 创建异步 Cosmos DB 客户端对象和对 Cosmos DB 容器的引用,以在笔记本中使用。

    单元格 [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. 创建异步函数以查询 Cosmos DB 容器

    单元格 [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. 调用新定义的异步函数以返回查询的结果

    单元格 [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output