Freigeben über


Arbeiten mit Cosmos DB in einem Python-Notizbuch in Microsoft Fabric

Sie können das Cosmos DB Python SDK in einem Python-Notizbuch in Microsoft Fabric verwenden, um Schreib- und Abfragedaten aus Cosmos DB in Microsoft Fabric zu lesen. Sie können auch Cosmos DB-Container erstellen und verwalten.

Die Verwendung des Spark-Connectors unterscheidet sich von der Verwendung von Spark, um Daten aus gespiegelten Fabric-Daten in der Cosmos DB zu lesen, die in OneLake gespeichert sind, da er eine direkte Verbindung zum Cosmos DB-Endpunkt herstellt, um Vorgänge auszuführen.

Tipp

Laden Sie das vollständige Beispiel von Cosmos DB in Microsoft Fabric Samples auf GitHub herunter.

Voraussetzungen

Hinweis

In diesem Artikel wird das integrierte Cosmos DB-Beispiel verwendet, das mit einem Containernamen von SampleData erstellt wurde.

Abrufen des Cosmos DB-Endpunkts

Rufen Sie zuerst den Endpunkt für die Cosmos DB-Datenbank in Fabric ab. Dieser Endpunkt ist erforderlich, um eine Verbindung mit dem Cosmos DB Spark Connector herzustellen.

  1. Öffnen Sie das Fabric-Portal (https://app.fabric.microsoft.com).

  2. Navigieren Sie zu Ihrer vorhandenen Cosmos DB-Datenbank.

  3. Wählen Sie die Einstellungsoption in der Menüleiste für die Datenbank aus.

    Screenshot der Menüleistenoption

  4. Navigieren Sie im Dialogfeld "Einstellungen" zum Abschnitt "Verbindung" . Kopieren Sie dann den Wert des Endpoints für die Cosmos DB NoSQL-Datenbank. Sie verwenden diesen Wert in einem späteren Schritt[s].

    Screenshot des Abschnitts

Installieren des Cosmos DB Python SDK-Pakets

  • Installieren Sie das Azure-Cosmos-Paket in Ihrem Notizbuch. Dies sollte Version 4.14.0 oder höher sein.

    Zelle [1]:

    #Install packages
    %pip install azure-cosmos
    

Importieren von Bibliotheken und Festlegen von Konfigurationswerten

  • Importieren Sie die Pakete in Ihr Notizbuch. In diesem und anderen Beispielen verwenden wir die asynchrone Bibliothek für Cosmos DB. Wenden Sie dann den Cosmos DB-Endpunkt, den Datenbanknamen und den Containernamen an, den Sie in einem vorherigen Schritt gespeichert haben.

    Zelle [2]:

    #Imports and config values
    import logging
    
    from azure.cosmos.aio import CosmosClient
    from azure.cosmos.exceptions import CosmosHttpResponseError
    
    COSMOS_ENDPOINT = 'https://my-cosmos-endpoint.cosmos.fabric.microsoft.com:443/'
    COSMOS_DATABASE_NAME = '{your-cosmos-artifact-name}'
    COSMOS_CONTAINER_NAME = '{your-container-name}'
    

Erstellen benutzerdefinierter Token-Zugangsdaten zur Authentifizierung

  1. Erstellen Sie ein FabricTokenCredential()-Objekt, um ein gültiges Anmeldeinformationsobjekt für das Cosmos DB SDK aus der Tokenzeichenfolge zu erzeugen, die von den Fabric NotebookUtils-Anmeldeinformationsprogrammen generiert wird, die zum Authentifizieren eines Benutzers erforderlich sind.

    [HINWEIS!] Microsoft Fabric-Notizbücher unterstützen keine nativen Azure-Anmeldeinformationen-Objekte. Sie können sich nicht mit DefaultAzureCredential() bei Cosmos DB innerhalb von Microsoft Fabric authentifizieren.

    Zelle [3]:

    # Custom TokenCredential implementation for Fabric authentication in a notebook
    %pip install azure-core
    from azure.core.credentials import TokenCredential, AccessToken
    import base64
    import json
    import notebookutils
    from datetime import datetime, timezone
    
    class FabricTokenCredential(TokenCredential):
    
       def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                      enable_cae: bool = False, **kwargs: Any) -> AccessToken:
          access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/")
          parts = access_token.split(".")
          if len(parts) < 2:
                raise ValueError("Invalid JWT format")
          payload_b64 = parts[1]
          # Fix padding
          padding = (-len(payload_b64)) % 4
          if padding:
                payload_b64 += "=" * padding
          payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
          payload = json.loads(payload_json)
          exp = payload.get("exp")
          if exp is None:
                raise ValueError("exp claim missing in token")
          return AccessToken(token=access_token, expires_on=exp) 
    
  2. Erstellen Sie ein asynchrones Cosmos DB-Clientobjekt und einen Verweis auf den Cosmos DB-Container, der in einem Notizbuch verwendet werden soll.

    Zelle [4]:

    # Initialize Cosmos DB client with custom credential and container object
    COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
    DATABASE = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
    CONTAINER = DATABASE.get_container_client(COSMOS_CONTAINER_NAME)
    
  3. Erstellen einer asynchronen Funktion zum Abfragen des Cosmos DB-Containers

    Zelle [5]:

    #Define function to search for all products by category name
    async def search_products(categoryName: str) -> List[Dict[str, Any]]:
    
       try:
          # Use parameterized query
          query = """
             SELECT 
                *
             FROM c 
             WHERE 
                c.categoryName = @categoryName AND
                c.docType = @docType
          """
    
          # Set the parameter values
          parameters = [
                {"name": "@docType", "value": "product"},
                {"name": "@categoryName", "value": categoryName}
          ]
    
          # Async query: gather results into a list
          products = [p async for p in CONTAINER.query_items(
                query=query,
                parameters=parameters
          )]
    
          return products
    
       except CosmosHttpResponseError as e:
          logging.error(f"Cosmos DB query failed: {e}")
          raise
       except Exception as e:
          logging.error(f"Unexpected error in search_products: {e}")
          raise
    
  4. Rufen Sie die neu definierte asynchrone Funktion auf, um die Ergebnisse der Abfrage zurückzugeben.

    Zelle [6]:

       # Search for products in a category
       products = await search_products(categoryName="Computers, Laptops")
    
       display(products) #For tabular output
       pprint(products) #Json friendly output