Hello,
I have an Event Grid trigger Azure function that takes events, processes them, checks if a vertex exists in the Cosmos DB (Gremlin), and if not, adds this vertex to the database. When the number of events for processing is small, everything works properly. However, when the number of events increases, I get an error: “Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f8a5c51eb60>” and “raise RuntimeError”.
I suspect this is because I did not implement asynchronous loading properly. So, I decided to modify my code (added async and wait).
Now it looks like this:
import logging
import azure.functions as func
import processing_functions as pf
from gremlin_python.driver import client, serializer
import os
import time
import asyncio
app = func.FunctionApp()
async def initialize_gremlin_client():
# Initialize Gremlin client
account_name = os.environ.get('CosmosDBAccount')
database_id = os.environ.get('CosmosDBDatabaseID')
graph_id = os.environ.get('CosmosDBGraphID')
primary_key = os.environ.get('CosmosDBPrimaryKey')
if not all([account_name, database_id, graph_id, primary_key]):
logging.error("Missing Cosmos DB connection settings. Exiting.")
return None
gremlin_client = client.Client(
url=f"wss://{account_name}.gremlin.cosmos.azure.com:443/",
traversal_source="g",
username=f"/dbs/{database_id}/colls/{graph_id}",
password=f"{primary_key}",
message_serializer=serializer.GraphSONSerializersV2d0() )
return gremlin_client
async def close_gremlin_client(gremlin_client):
if gremlin_client:
await gremlin_client.close()
@app.event_grid_trigger(arg_name="eventgridobject")
async def EventGridTriggerToDB(eventgridobject: func.EventGridEvent):
gremlin_client=None
# Read event data
event_data = pf.read_event(eventgridobject)
try:
if event_data:
data = event_data.get('data')
# Initialize Gremlin client
gremlin_client = await initialize_gremlin_client()
if gremlin_client is None:
logging.error("Failed to initialize Gremlin client.")
return
# Loading objects
await pf.process_object(data, gremlin_client)
except Exception as e:
err = str(e)
logging.error(f"Error while sending an event {event_data}: {err}")
finally:
# Close the Gremlin client session
await close_gremlin_client(gremlin_client)
async def process_object(data, gremlin_client):
if not await vertex_exists(data, gremlin_client):
await insert_vertex(data, gremlin_client)
async def execute_gremlin_query(query, gremlin_client):
callback = await gremlin_client.submitAsync(query)
result = await callback.result().all().result()
if result:
return result
return None
def read_event(eventgridobject):
# Convert EventGridEvent to a dictionary
event_data = {
'id': eventgridobject.id,
'data': eventgridobject.get_json(),
'subject': eventgridobject.subject,
'event_type': eventgridobject.event_type
}
return event_data
async def vertex_exists(object_data, gremlin_client):
label = object_data.get('label')
data_id = object_data.get('data_id')
gremlin_query = f"g.V().hasLabel('{label}').has('data_id', '{data_id}').count()"
# Execute the query
result = await execute_gremlin_query(gremlin_query, gremlin_client)
# Get the count from the result
count = result[0]
logging.info(f"Count for vertex is {count}")
# Return True if count is greater than 0, indicating that the vertex exists
return count > 0
async def insert_vertex(object_data, gremlin_client):
logging.info(f"Inserting vertex... Data {object_data}")
label = object_data.get('label')
query = f"g.addV('{label}')"
for prop_name, prop_value in object_data.items():
if prop_name not in ['label']:
query += f".property('{prop_name}', '{prop_value}')"
query += ".property('pk', 'pk')"
# Execute Gremlin query
try:
response = await execute_gremlin_query(query, gremlin_client)
logging.info(f"\n>{query}")
logging.info(f"Gremlin query (insert vertex) executed successfully: {response}.")
except Exception as e:
err = str(e)
logging.error(f"Error executing Gremlin query (insert vertex): {err}\nquery: {query}")
But I still have errors: “Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f8a5c51eb60>” and “raise RuntimeError”, “Cannot run the event loop while another loop is running”.
Could anyone tell me where I might have made a mistake?
Many thanks!