Compartilhar via


Trabalhar com o Cosmos DB em um notebook Python no Microsoft Fabric

Você pode usar o SDK Python do Cosmos DB em um notebook Python no Microsoft Fabric para ler, gravar e consultar dados do Cosmos DB no Microsoft Fabric. Você também pode criar e gerenciar contêineres do Cosmos DB.

Usar o conector do Spark é diferente de usar o Spark para ler dados do Cosmos DB no Fabric, onde os dados espelhados estão armazenados no OneLake, pois ele se conecta diretamente ao endpoint do Cosmos DB para executar operações.

Dica

Baixe o exemplo completo do Cosmos DB em Exemplos do Microsoft Fabric no GitHub.

Pré-requisitos

Observação

Este artigo usa o exemplo interno do Cosmos DB criado com um nome de contêiner de SampleData.

Recuperar endpoint do Cosmos DB

Primeiro, obtenha o ponto de extremidade do banco de dados do Cosmos DB no Fabric. O endpoint é necessário para conectar usando o Conector Spark do Cosmos DB.

  1. Abra o portal do Fabric (https://app.fabric.microsoft.com).

  2. Navegue até o banco de dados do Cosmos DB existente.

  3. Selecione a opção Configurações na barra de menus do banco de dados.

    Captura de tela da opção de barra de menus 'Configurações' para um banco de dados no portal do Fabric.

  4. Na caixa de diálogo configurações, navegue até a seção Conexão . Em seguida, copie o valor do campo de Endpoint para banco de dados NoSQL do Cosmos DB. Use esse valor nas etapas posteriores.

    Captura de tela da seção 'Conexão' da caixa de diálogo 'Configurações' de um banco de dados no portal do Fabric.

Instalar o pacote do SDK do Python do Cosmos DB

  • Instale o pacote do azure-cosmos em seu notebook. Essa deve ser a versão 4.14.0 ou posterior.

    Célula [1]:

    #Install packages
    %pip install azure-cosmos
    

Importar bibliotecas e definir valores de configuração

  • Importe os pacotes para o bloco de anotações. Neste e em outros exemplos, usamos a biblioteca assíncrona para o Cosmos DB. Em seguida, aplique o ponto de extremidade do Cosmos DB, o nome do banco de dados e o nome do contêiner que você salvou em uma etapa anterior.

    Célula [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Criar uma credencial de token personalizada para autenticar

  1. Crie um objeto FabricTokenCredential() para produzir um objeto de credencial válido para o SDK do Cosmos DB a partir da cadeia de caracteres de token gerada pelos utilitários de credencial Fabric NotebookUtils , que é necessário para autenticar um usuário.

    [OBSERVAÇÃO!] Os notebooks do Microsoft Fabric não dão suporte a objetos de Credencial do Azure nativamente. Você não pode usar DefaultAzureCredential() para autenticar no Cosmos DB no Microsoft Fabric.

    Célula [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Crie um objeto cliente assíncrono do Cosmos DB e uma referência ao contêiner do Cosmos DB a ser usado em um notebook.

    Célula [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Criar uma função assíncrona para consultar o contêiner do Cosmos DB

    Célula [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Chame a função assíncrona recém-definida para retornar os resultados da consulta

    Célula [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output