Sdílet prostřednictvím


Práce se službou Cosmos DB v poznámkovém bloku Pythonu v Microsoft Fabric

Sadu Python SDK služby Cosmos DB můžete použít v poznámkovém bloku Pythonu v Microsoft Fabric ke čtení zápisu a dotazování dat ze služby Cosmos DB v Microsoft Fabric. Můžete také vytvářet a spravovat kontejnery Cosmos DB.

Použití konektoru Spark se liší od použití Sparku ke čtení dat z Cosmos DB, které jsou v rámci Fabric zrcadleně uloženy v OneLake, protože se připojuje přímo ke koncovému bodu Cosmos DB, aby prováděl operace.

Návod

Stáhněte si kompletní ukázku ze služby Cosmos DB v ukázkách Microsoft Fabric na GitHubu.

Požadavky

Poznámka:

Tento článek používá integrovanou ukázku Cosmos DB vytvořenou s názvem kontejneru SampleData.

Načtení koncového bodu Cosmos DB

Nejprve najděte koncový bod pro databázi Cosmos DB ve Fabricu. Tento koncový bod se vyžaduje pro připojení pomocí konektoru Spark služby Cosmos DB.

  1. Otevřete portál Fabric (https://app.fabric.microsoft.com).

  2. Přejděte do existující databáze Cosmos DB.

  3. V řádku nabídek databáze vyberte možnost Nastavení .

    Snímek obrazovky s možností nabídky Nastavení pro databázi na portálu Fabric

  4. V dialogovém okně nastavení přejděte do části Připojení . Pak zkopírujte hodnotu koncového bodu pro databázi NoSQL služby Cosmos DB. Tuto hodnotu použijete v pozdějších krocích.

    Snímek obrazovky s částí Připojení v dialogovém okně Nastavení pro databázi na portálu Fabric

Instalace balíčku Python SDK služby Cosmos DB

  • Nainstalujte balíček azure-cosmos do poznámkového bloku. Mělo by to být verze 4.14.0 nebo novější.

    Buňka [1]:

    #Install packages
    %pip install azure-cosmos
    

Import knihoven a nastavení konfiguračních hodnot

  • Importujte balíčky do poznámkového bloku. V této a dalších ukázkách používáme asynchronní knihovnu pro Cosmos DB. Pak použijte koncový bod služby Cosmos DB, název databáze a název kontejneru, které jste uložili v předchozím kroku.

    Buňka [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Vytvořte vlastní přihlašovací token pro ověření

  1. Vytvořte objekt FabricTokenCredential(), který vytvoří platný objekt přihlašovacích údajů pro sadu SDK služby Cosmos DB z řetězce tokenu vygenerovaného nástroji pro přihlašovací údaje Fabric NotebookUtils , které se vyžadují k ověření uživatele.

    [POZNÁMKA!] Poznámkové bloky Microsoft Fabric nativně nepodporují objekty přihlašovacích údajů Azure. K ověření ve službě Cosmos DB v Microsoft Fabric se nedá použít DefaultAzureCredential() .

    Buňka [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Vytvořte asynchronní objekt klienta Cosmos DB a odkaz na kontejner Cosmos DB, který se má použít v poznámkovém bloku.

    Buňka [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Vytvoření asynchronní funkce pro dotazování kontejneru Cosmos DB

    Buňka [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Voláním nově definované asynchronní funkce vrátíte výsledky dotazu.

    Buňka [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output