หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
คุณสามารถใช้ Cosmos DB Python SDK ในสมุดบันทึก Python ใน Microsoft Fabric เพื่ออ่าน เขียน และคิวรีข้อมูลจาก Cosmos DB ใน Microsoft Fabric คุณยังสามารถสร้างและจัดการคอนเทนเนอร์ Cosmos DB ได้อีกด้วย
การใช้ตัวเชื่อมต่อ Spark แตกต่างจากการใช้ Spark เพื่ออ่านข้อมูลจาก Cosmos DB ในข้อมูลที่มิเรอร์ Fabric ที่จัดเก็บไว้ใน OneLake เนื่องจากเชื่อมต่อโดยตรงกับตําแหน่งข้อมูล Cosmos DB เพื่อดําเนินการ
เคล็ดลับ
ดาวน์โหลดตัวอย่างที่สมบูรณ์จาก Cosmos DB ในตัวอย่าง Microsoft Fabric บน GitHub
ข้อกําหนดเบื้องต้น
ความจุ Fabric ที่มีอยู่
- ถ้าคุณไม่มีความจุ Fabric ให้เริ่มการทดลองใช้ Fabric
ฐานข้อมูล Cosmos DB ที่มีอยู่ใน Fabric
- ถ้าคุณยังไม่มี ให้สร้างฐานข้อมูล Cosmos DB ใหม่ใน Fabric
คอนเทนเนอร์ที่มีอยู่พร้อมข้อมูล
- ถ้าคุณยังไม่มี เราขอแนะนําให้คุณโหลดคอนเทนเนอร์ข้อมูลตัวอย่าง
Note
บทความนี้ใช้ตัวอย่าง Cosmos DB ที่มีอยู่แล้วภายในที่สร้างขึ้นด้วยชื่อคอนเทนเนอร์ของ SampleData
ดึงข้อมูลตําแหน่งข้อมูล Cosmos DB
ขั้นแรก รับตําแหน่งข้อมูลสําหรับฐานข้อมูล Cosmos DB ใน Fabric จุดสิ้นสุดนี้จําเป็นสําหรับการเชื่อมต่อโดยใช้ Cosmos DB Spark Connector
เปิดพอร์ทัล Fabric (https://app.fabric.microsoft.com)
นําทางไปยังฐานข้อมูล Cosmos DB ที่มีอยู่ของคุณ
เลือกตัวเลือก การตั้งค่า ในแถบเมนูสําหรับฐานข้อมูล
ในกล่องโต้ตอบการตั้งค่า ให้ไปที่ส่วนการเชื่อมต่อ จากนั้น คัดลอกค่าของฟิลด์ฐานข้อมูล Endpoint for Cosmos DB NoSQL คุณใช้ค่านี้ในขั้นตอนต่อมา
ติดตั้งแพ็คเกจ Cosmos DB Python SDK
ติดตั้งแพคเกจ azure-cosmos ในสมุดบันทึกของคุณ ควรเป็นเวอร์ชัน 4.14.0 หรือใหม่กว่า
เซลล์ [1]:
#Install packages %pip install azure-cosmos
นําเข้าไลบรารีและตั้งค่าการกําหนดค่า
นําเข้าแพ็คเกจลงในสมุดบันทึกของคุณ ในตัวอย่างนี้และตัวอย่างอื่นๆ เราใช้ไลบรารีแบบอะซิงโครนัสสําหรับ Cosmos DB จากนั้นใช้ตําแหน่งข้อมูล Cosmos DB ชื่อฐานข้อมูล และชื่อคอนเทนเนอร์ที่คุณบันทึกไว้ในขั้นตอนก่อนหน้า
เซลล์ [2]:
#Imports and config values import logging from azure.cosmos.aio import CosmosClient from azure.cosmos.exceptions import CosmosHttpResponseError COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/' COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}' COSMOS_CONTAINER_NAME = '{your-container-name}'
สร้างข้อมูลประจําตัวโทเค็นแบบกําหนดเองเพื่อรับรองความถูกต้อง
สร้างวัตถุ FabricTokenCredential() เพื่อสร้างวัตถุข้อมูลประจําตัวที่ถูกต้องสําหรับ Cosmos DB SDK จากสตริงโทเค็นที่สร้างขึ้นโดย ยูทิลิตี้ข้อมูลประจําตัว Fabric NotebookUtils ซึ่งจําเป็นในการรับรองความถูกต้องของผู้ใช้
[หมายเหตุ] สมุดบันทึก Microsoft Fabric ไม่สนับสนุนออบเจ็กต์ Azure Credential โดยกําเนิด คุณไม่สามารถใช้เพื่อ
DefaultAzureCredential()รับรองความถูกต้องกับ Cosmos DB ใน Microsoft Fabricเซลล์ [3]:
# Custom TokenCredential implementation for Fabric authentication in a notebook %pip install azure-core from azure.core.credentials import TokenCredential, AccessToken import base64 import json import notebookutils from datetime import datetime, timezone class FabricTokenCredential(TokenCredential): def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None, enable_cae: bool = False, **kwargs: Any) -> AccessToken: access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/") parts = access_token.split(".") if len(parts) < 2: raise ValueError("Invalid JWT format") payload_b64 = parts[1] # Fix padding padding = (-len(payload_b64)) % 4 if padding: payload_b64 += "=" * padding payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8") payload = json.loads(payload_json) exp = payload.get("exp") if exp is None: raise ValueError("exp claim missing in token") return AccessToken(token=access_token, expires_on=exp)สร้างออบเจ็กต์ไคลเอ็นต์ Cosmos DB แบบอะซิงโครนัสและการอ้างอิงไปยังคอนเทนเนอร์ Cosmos DB เพื่อใช้ในสมุดบันทึก
เซลล์ [4]:
# Initialize Cosmos DB client with custom credential and container object COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential()) DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME) CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)สร้างฟังก์ชันแบบอะซิงโครนัสเพื่อสืบค้นคอนเทนเนอร์ Cosmos DB
เซลล์ [5]:
#Define function to search for all products by category name async def search_products(categoryName: str) -> List[Dict[str, Any]]: try: # Use parameterized query query = """ SELECT * FROM c WHERE c.categoryName = @categoryName AND c.docType = @docType """ # Set the parameter values parameters = [ {"name": "@docType", "value": "product"}, {"name": "@categoryName", "value": categoryName} ] # Async query: gather results into a list products = [p async for p in CONTAINER.query_items( query=query, parameters=parameters )] return products except CosmosHttpResponseError as e: logging.error(f"Cosmos DB query failed: {e}") raise except Exception as e: logging.error(f"Unexpected error in search_products: {e}") raiseเรียกใช้ฟังก์ชัน async ที่กําหนดใหม่เพื่อส่งกลับผลลัพธ์ของแบบสอบถาม
เซลล์ [6]:
# Search for products in a category products = await search_products(categoryName="Computers, Laptops") display(products) #For tabular output pprint(products) #Json friendly output