แชร์ผ่าน


ทํางานกับ Cosmos DB ในสมุดบันทึก Python ใน Microsoft Fabric

คุณสามารถใช้ Cosmos DB Python SDK ในสมุดบันทึก Python ใน Microsoft Fabric เพื่ออ่าน เขียน และคิวรีข้อมูลจาก Cosmos DB ใน Microsoft Fabric คุณยังสามารถสร้างและจัดการคอนเทนเนอร์ Cosmos DB ได้อีกด้วย

การใช้ตัวเชื่อมต่อ Spark แตกต่างจากการใช้ Spark เพื่ออ่านข้อมูลจาก Cosmos DB ในข้อมูลที่มิเรอร์ Fabric ที่จัดเก็บไว้ใน OneLake เนื่องจากเชื่อมต่อโดยตรงกับตําแหน่งข้อมูล Cosmos DB เพื่อดําเนินการ

เคล็ดลับ

ดาวน์โหลดตัวอย่างที่สมบูรณ์จาก Cosmos DB ในตัวอย่าง Microsoft Fabric บน GitHub

ข้อกําหนดเบื้องต้น

Note

บทความนี้ใช้ตัวอย่าง Cosmos DB ที่มีอยู่แล้วภายในที่สร้างขึ้นด้วยชื่อคอนเทนเนอร์ของ SampleData

ดึงข้อมูลตําแหน่งข้อมูล Cosmos DB

ขั้นแรก รับตําแหน่งข้อมูลสําหรับฐานข้อมูล Cosmos DB ใน Fabric จุดสิ้นสุดนี้จําเป็นสําหรับการเชื่อมต่อโดยใช้ Cosmos DB Spark Connector

  1. เปิดพอร์ทัล Fabric (https://app.fabric.microsoft.com)

  2. นําทางไปยังฐานข้อมูล Cosmos DB ที่มีอยู่ของคุณ

  3. เลือกตัวเลือก การตั้งค่า ในแถบเมนูสําหรับฐานข้อมูล

    สกรีนช็อตของตัวเลือกแถบเมนู 'การตั้งค่า' สําหรับฐานข้อมูลในพอร์ทัล Fabric

  4. ในกล่องโต้ตอบการตั้งค่า ให้ไปที่ส่วนการเชื่อมต่อ จากนั้น คัดลอกค่าของฟิลด์ฐานข้อมูล Endpoint for Cosmos DB NoSQL คุณใช้ค่านี้ในขั้นตอนต่อมา

    สกรีนช็อตของส่วน 'การเชื่อมต่อ' ของกล่องโต้ตอบ 'การตั้งค่า' สําหรับฐานข้อมูลในพอร์ทัล Fabric

ติดตั้งแพ็คเกจ Cosmos DB Python SDK

  • ติดตั้งแพคเกจ azure-cosmos ในสมุดบันทึกของคุณ ควรเป็นเวอร์ชัน 4.14.0 หรือใหม่กว่า

    เซลล์ [1]:

    #Install packages
    %pip install azure-cosmos
    

นําเข้าไลบรารีและตั้งค่าการกําหนดค่า

  • นําเข้าแพ็คเกจลงในสมุดบันทึกของคุณ ในตัวอย่างนี้และตัวอย่างอื่นๆ เราใช้ไลบรารีแบบอะซิงโครนัสสําหรับ Cosmos DB จากนั้นใช้ตําแหน่งข้อมูล Cosmos DB ชื่อฐานข้อมูล และชื่อคอนเทนเนอร์ที่คุณบันทึกไว้ในขั้นตอนก่อนหน้า

    เซลล์ [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

สร้างข้อมูลประจําตัวโทเค็นแบบกําหนดเองเพื่อรับรองความถูกต้อง

  1. สร้างวัตถุ FabricTokenCredential() เพื่อสร้างวัตถุข้อมูลประจําตัวที่ถูกต้องสําหรับ Cosmos DB SDK จากสตริงโทเค็นที่สร้างขึ้นโดย ยูทิลิตี้ข้อมูลประจําตัว Fabric NotebookUtils ซึ่งจําเป็นในการรับรองความถูกต้องของผู้ใช้

    [หมายเหตุ] สมุดบันทึก Microsoft Fabric ไม่สนับสนุนออบเจ็กต์ Azure Credential โดยกําเนิด คุณไม่สามารถใช้เพื่อ DefaultAzureCredential() รับรองความถูกต้องกับ Cosmos DB ใน Microsoft Fabric

    เซลล์ [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. สร้างออบเจ็กต์ไคลเอ็นต์ Cosmos DB แบบอะซิงโครนัสและการอ้างอิงไปยังคอนเทนเนอร์ Cosmos DB เพื่อใช้ในสมุดบันทึก

    เซลล์ [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. สร้างฟังก์ชันแบบอะซิงโครนัสเพื่อสืบค้นคอนเทนเนอร์ Cosmos DB

    เซลล์ [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. เรียกใช้ฟังก์ชัน async ที่กําหนดใหม่เพื่อส่งกลับผลลัพธ์ของแบบสอบถาม

    เซลล์ [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output