共用方式為


在 Microsoft Fabric 的 Python 筆記本中使用 Cosmos DB

你可以在 Microsoft Fabric 的 Python 筆記本中使用 Cosmos DB Python SDK 來讀取、寫入和查詢 Microsoft Fabric 中的 Cosmos DB 資料。 你也可以建立和管理 Cosmos DB 容器。

使用 Spark 連接器與使用 Spark 從 Fabric 中 OneLake 儲存的 Cosmos DB 鏡像資料讀取不同,因為它直接連接到 Cosmos DB 端點來執行操作。

小提示

可於 GitHub 的 Microsoft Fabric Samples 下載來自 Cosmos DB 的完整範例。

先決條件

備註

本文使用了內建的 Cosmos DB 範例,並以容器名稱 SampleData 建立。

擷取 Cosmos DB 端點

首先,在 Fabric 中取得 Cosmos DB 資料庫的端點。 此端點必須使用 Cosmos DB Spark 連接器連接。

  1. 開啟 Fabric 入口網站 (https://app.fabric.microsoft.com)。

  2. 流覽至您現有的 Cosmos DB 資料庫。

  3. 選取資料庫選單列中的 [ 設定 ] 選項。

    網狀架構入口網站中資料庫 [設定] 功能表欄選項的螢幕快照。

  4. 在 [設定] 對話框中,流覽至 [ 連線] 區段。 然後,複製 [Cosmos DB NoSQL 資料庫端點 ] 字段的值。 您可以在後面的步驟中使用此值。

    網狀架構入口網站中資料庫 [設定] 對話框 [連線] 區段的螢幕快照。

安裝 Cosmos DB Python SDK 套件

  • 在你的筆記本中安裝 azure-cosmos 套件。 這應該是 4.14.0 或更新版本。

    儲存格 [1]:

    #Install packages
    %pip install azure-cosmos
    

匯入函式庫與設定設定值

  • 把套件匯入你的筆記本。 在這個範例和其他範例中,我們使用 Cosmos DB 的非同步函式庫。 接著套用你之前儲存的 Cosmos DB 端點、資料庫名稱和容器名稱。

    細胞 [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

建立自訂的憑證來認證

  1. 建立一個 FabricTokenCredential() 物件,從 Fabric NotebookUtils 憑證工具 所產生的權杖字串中產生有效的憑證物件,此物件用於認證使用者。

    [注意!] Microsoft Fabric 筆記型電腦原生不支援 Azure 憑證物件。 你無法使用DefaultAzureCredential()在 Microsoft Fabric 中對 Cosmos DB 進行驗證。

    儲存格 [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. 建立非同步 Cosmos DB 客戶端物件,並設置 Cosmos DB 容器引用,以便在筆記本中使用。

    細胞 [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. 建立一個非同步函式來查詢 Cosmos DB 容器

    細胞 [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. 呼叫新定義的非同步函式以回傳查詢結果

    細胞 [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output