你可以在 Microsoft Fabric 的 Python 筆記本中使用 Cosmos DB Python SDK 來讀取、寫入和查詢 Microsoft Fabric 中的 Cosmos DB 資料。 你也可以建立和管理 Cosmos DB 容器。
使用 Spark 連接器與使用 Spark 從 Fabric 中 OneLake 儲存的 Cosmos DB 鏡像資料讀取不同,因為它直接連接到 Cosmos DB 端點來執行操作。
小提示
可於 GitHub 的 Microsoft Fabric Samples 下載來自 Cosmos DB 的完整範例。
先決條件
現有的 Fabric 架構容量
- 如果您沒有 Fabric 容量, 請啟動 Fabric 試用版。
Fabric 中現有的 Cosmos DB 資料庫
- 如果您還沒有,請 在 Fabric 中建立新的 Cosmos DB 資料庫。
具有數據的現有容器
- 如果您還沒有範例 數據容器,建議您載入範例數據容器。
備註
本文使用了內建的 Cosmos DB 範例,並以容器名稱 SampleData 建立。
擷取 Cosmos DB 端點
首先,在 Fabric 中取得 Cosmos DB 資料庫的端點。 此端點必須使用 Cosmos DB Spark 連接器連接。
開啟 Fabric 入口網站 (https://app.fabric.microsoft.com)。
流覽至您現有的 Cosmos DB 資料庫。
選取資料庫選單列中的 [ 設定 ] 選項。
在 [設定] 對話框中,流覽至 [ 連線] 區段。 然後,複製 [Cosmos DB NoSQL 資料庫端點 ] 字段的值。 您可以在後面的步驟中使用此值。
安裝 Cosmos DB Python SDK 套件
在你的筆記本中安裝 azure-cosmos 套件。 這應該是 4.14.0 或更新版本。
儲存格 [1]:
#Install packages %pip install azure-cosmos
匯入函式庫與設定設定值
把套件匯入你的筆記本。 在這個範例和其他範例中,我們使用 Cosmos DB 的非同步函式庫。 接著套用你之前儲存的 Cosmos DB 端點、資料庫名稱和容器名稱。
細胞 [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
建立自訂的憑證來認證
建立一個 FabricTokenCredential() 物件,從 Fabric NotebookUtils 憑證工具 所產生的權杖字串中產生有效的憑證物件,此物件用於認證使用者。
[注意!] Microsoft Fabric 筆記型電腦原生不支援 Azure 憑證物件。 你無法使用
DefaultAzureCredential()在 Microsoft Fabric 中對 Cosmos DB 進行驗證。儲存格 [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)建立非同步 Cosmos DB 客戶端物件,並設置 Cosmos DB 容器引用,以便在筆記本中使用。
細胞 [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)建立一個非同步函式來查詢 Cosmos DB 容器
細胞 [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raise呼叫新定義的非同步函式以回傳查詢結果
細胞 [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output