Share via

How can I implement asynchronous loading into Azure Cosmos DB (Gremlin) with Python 3.10 correctly?

Maryna Paluyanava 211 Reputation points
Jun 23, 2024, 10:05 PM

Hello,

I have an Event Grid trigger Azure function that takes events, processes them, checks if a vertex exists in the Cosmos DB (Gremlin), and if not, adds this vertex to the database. When the number of events for processing is small, everything works properly. However, when the number of events increases, I get an error: “Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f8a5c51eb60>” and “raise RuntimeError”.

I suspect this is because I did not implement asynchronous loading properly. So, I decided to modify my code (added async and wait).
Now it looks like this:

import logging
import azure.functions as func
import processing_functions as pf
from gremlin_python.driver import client, serializer
import os
import time
import asyncio


app = func.FunctionApp()
 
async def initialize_gremlin_client():
    # Initialize Gremlin client
    account_name = os.environ.get('CosmosDBAccount')
    database_id = os.environ.get('CosmosDBDatabaseID')
    graph_id = os.environ.get('CosmosDBGraphID')
    primary_key = os.environ.get('CosmosDBPrimaryKey')
 
    if not all([account_name, database_id, graph_id, primary_key]):
        logging.error("Missing Cosmos DB connection settings. Exiting.")
        return None
 
    gremlin_client = client.Client(
        url=f"wss://{account_name}.gremlin.cosmos.azure.com:443/",
        traversal_source="g",
        username=f"/dbs/{database_id}/colls/{graph_id}",
        password=f"{primary_key}",
        message_serializer=serializer.GraphSONSerializersV2d0()    )
    return gremlin_client
 
async def close_gremlin_client(gremlin_client):
   if gremlin_client:
        await gremlin_client.close()
 

@app.event_grid_trigger(arg_name="eventgridobject")
async def EventGridTriggerToDB(eventgridobject: func.EventGridEvent):
    
	gremlin_client=None
 
    # Read event data
    event_data = pf.read_event(eventgridobject)
 
    try:
        if event_data:
            data = event_data.get('data')
 
            # Initialize Gremlin client
            gremlin_client = await initialize_gremlin_client()
            if gremlin_client is None:
                logging.error("Failed to initialize Gremlin client.")
                return           
 
            # Loading objects
            await pf.process_object(data, gremlin_client)
 
    except Exception as e:
        err = str(e)
        logging.error(f"Error while sending an event {event_data}: {err}")
    finally:
        # Close the Gremlin client session
        await close_gremlin_client(gremlin_client)
 

 
async def process_object(data, gremlin_client):
    if not  await vertex_exists(data, gremlin_client):
        await insert_vertex(data, gremlin_client)
 

async def execute_gremlin_query(query, gremlin_client):
    callback = await gremlin_client.submitAsync(query)
    result = await callback.result().all().result()
    if result:
        return result
    return None
 
 
def read_event(eventgridobject):
    # Convert EventGridEvent to a dictionary
    event_data = {
        'id': eventgridobject.id,
        'data': eventgridobject.get_json(),
        'subject': eventgridobject.subject,
        'event_type': eventgridobject.event_type
    }
    return event_data
 
 
async def vertex_exists(object_data, gremlin_client):
     label = object_data.get('label')
     data_id = object_data.get('data_id')
     gremlin_query = f"g.V().hasLabel('{label}').has('data_id', '{data_id}').count()"
 
    # Execute the query
    result = await execute_gremlin_query(gremlin_query, gremlin_client)
   
    # Get the count from the result
    count = result[0]
    logging.info(f"Count for vertex is {count}")
   
    # Return True if count is greater than 0, indicating that the vertex exists
    return count > 0
 
 
 
async def insert_vertex(object_data, gremlin_client):
    logging.info(f"Inserting vertex... Data {object_data}")
    label = object_data.get('label')
 
     query = f"g.addV('{label}')"
     for prop_name, prop_value in object_data.items():
         if prop_name not in ['label']:
             query += f".property('{prop_name}', '{prop_value}')"
 
     query += ".property('pk', 'pk')"
 
    # Execute Gremlin query
    try:
        response = await execute_gremlin_query(query, gremlin_client)
        logging.info(f"\n>{query}")
        logging.info(f"Gremlin query (insert vertex) executed successfully: {response}.")
    except Exception as e:
        err = str(e)
        logging.error(f"Error executing Gremlin query (insert vertex): {err}\nquery: {query}")

But I still have errors: “Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f8a5c51eb60>” and “raise RuntimeError”,  “Cannot run the event loop while another loop is running”.

Could anyone tell me where I might have made a mistake?

Many thanks!

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,303 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,721 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Sajeetharan 2,261 Reputation points Microsoft Employee
    Jun 24, 2024, 5:12 AM

    This has nothing to do with Azure Cosmos DB or gremlin API, it is very generic python error when you do asynchronous calls, you may misunderstand a way how asyncio works. You can resolve this by the following way!

    install the package :

    pip install nest-asyncio
    

    Then add these lines:

    import nest_asyncio
    nest_asyncio.apply()
    

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.