다음을 통해 공유


Microsoft Fabric의 Python Notebook에서 Cosmos DB 작업

Microsoft Fabric의 Python Notebook에서 Cosmos DB Python SDK를 사용하여 Microsoft Fabric의 Cosmos DB에서 쓰기 및 쿼리 데이터를 읽을 수 있습니다. Cosmos DB 컨테이너를 만들고 관리할 수도 있습니다.

Spark 커넥터를 사용하는 것은 작업을 수행하기 위해 Cosmos DB 엔드포인트에 직접 연결하므로 Spark를 사용하여 OneLake에 저장된 패브릭 미러 데이터의 Cosmos DB에서 데이터를 읽는 것과 다릅니다.

필수 조건

비고

이 문서에서는 SampleData의 컨테이너 이름으로 만든 기본 제공 Cosmos DB 샘플을 사용합니다.

Cosmos DB 엔드포인트 검색

먼저 Fabric에서 Cosmos DB 데이터베이스에 대한 엔드포인트를 가져옵니다. 이 엔드포인트는 Cosmos DB Spark 커넥터를 사용하여 연결하는 데 필요합니다.

  1. Fabric 포털(https://app.fabric.microsoft.com)을 엽니다.

  2. 기존 Cosmos DB 데이터베이스로 이동합니다.

  3. 데이터베이스의 메뉴 모음에서 설정 옵션을 선택합니다.

    패브릭 포털의 데이터베이스에 대한 '설정' 메뉴 모음 옵션의 스크린샷.

  4. 설정 대화 상자에서 연결 섹션으로 이동합니다. 그런 다음 Cosmos DB NoSQL 데이터베이스의 엔드포인트 값을 복사합니다. 이후 단계에서 이 값을 사용합니다.

    패브릭 포털의 데이터베이스에 대한 '설정' 대화 상자의 '연결' 섹션 스크린샷

Cosmos DB Python SDK 패키지 설치

  • Notebook에 azure-cosmos 패키지를 설치합니다. 버전 4.14.0 이상이어야 합니다.

    셀 [1]:

    #Install packages
    %pip install azure-cosmos
    

라이브러리 가져오기 및 구성 값 설정

  • 패키지를 전자 필기장으로 가져옵니다. 이 샘플 및 기타 샘플에서는 Cosmos DB용 비동기 라이브러리를 사용합니다. 그런 다음 이전 단계에서 저장한 Cosmos DB 엔드포인트, 데이터베이스 이름 및 컨테이너 이름을 적용합니다.

    셀 [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

인증할 사용자 지정 토큰 자격 증명 만들기

  1. FabricTokenCredential() 개체를 만들어 사용자를 인증하는 데 필요한 Fabric NotebookUtils 자격 증명 유틸리티 에서 생성된 토큰 문자열에서 Cosmos DB SDK에 대한 유효한 자격 증명 개체를 생성합니다.

    [참고!] Microsoft Fabric Notebook은 기본적으로 Azure 자격 증명 개체를 지원하지 않습니다. Microsoft Fabric에서 Cosmos DB에 인증하는 데 사용할 DefaultAzureCredential() 수 없습니다.

    셀 [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. 비동기 Cosmos DB 클라이언트 개체 및 Notebook에서 사용할 Cosmos DB 컨테이너에 대한 참조를 만듭니다.

    셀 [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Cosmos DB 컨테이너를 쿼리하는 비동기 함수 만들기

    셀 [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. 새로 정의된 비동기 함수를 호출하여 쿼리 결과를 반환합니다.

    셀 [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output