How to load all objects from Event Grid into Cosmos DB for Gremlin

Maryna Paluyanava 211 Reputation points
2024-05-04T00:59:05.6366667+00:00

Hello,

I created a blob_trigger Azure function that takes an input blob and retrieves objects from it. These objects are then sent to Event Grid. Additionally, there is a second event_grid_trigger Azure function that should take these objects and load them into Cosmos DB for Gremlin. Unfortunately, only the last object is uploaded into the database.
Could you please tell me if it is possible to load all objects that were sent into Event Grid from an input blob?

Thank you!

Below is the simplified version of these functions:

Function1

import azure.functions as func
import logging
import datetime
 
app = func.FunctionApp()
 
@app.blob_trigger(arg_name="inputblob",
                path="data/input/{name}",
                connection="BlobStorageConnectionString")
@app.event_grid_output(arg_name="outputevent",
                        topic_endpoint_uri="MyEventGridTopicUriSetting",
                        topic_key_setting="MyEventGridTopicKeySetting")
 
def blob_trigger(inputblob: func.InputStream, outputevent: func.Out[func.EventGridOutputEvent]):
    logging.info(f"Python blob trigger function processed blob"
                f"Name: {inputblob.name}"
                f"Blob Size: {inputblob.length} bytes")
 
    # These objects were created after processing the input blob
    data_list = [{"label": "person", "name": "Ann", "id": "AAAA1234"}, {"label": "person", "name": "Sam", "id": "AAAA1235"}]
 
    for dt in data_list:
        outputevent.set(
        func.EventGridOutputEvent(
            id=dt["id"],
            data=dt,
            subject="test-subject",
            event_type="person",
            event_time = datetime.datetime.utcnow(),
            data_version = "data_v1")
    )

Function2

import logging
import azure.functions as func
import json
from gremlin_python.driver import client, serializer


app = func.FunctionApp()

@app.event_grid_trigger(arg_name="eventgridobject")

def EventGridTrigger(eventgridobject: func.EventGridEvent):
    result = json.dumps({
        'id': eventgridobject.id,
        'data': eventgridobject.get_json(),
        'subject': eventgridobject.subject,
        'event_type': eventgridobject.event_type,
    })

    result_dict = json.loads(result)

    event_type = result_dict['event_type']
    id = result_dict['data']['id']
    name = result_dict['data']['name']

    account_name = "AAAA"
    database_id = "BBBB"
    graph_id = "CCCC"
    primary_key = "DDDD"

    _gremlin_insert_vertices = f"g.addV('{event_type}').property('id', '{id}').property('name', '{name}').property('pk', 'pk')"

    gremlin_client = client.Client(
                        url=f"wss://{account_name}.gremlin.cosmos.azure.com:443/",
                        traversal_source="g",
                        username=f"/dbs/{database_id}/colls/{graph_id}",
                        password=f"{primary_key}",
                        message_serializer=serializer.GraphSONSerializersV2d0(),
                    )

    try:
        callback = gremlin_client.submit(_gremlin_insert_vertices)
    except Exception as e:
        logging.error(f"Error executing Gremlin query: {e}")

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,352 questions
0 comments No comments
{count} votes

Accepted answer
  1. Amira Bedhiafi 16,071 Reputation points
    2024-05-09T22:41:42.5333333+00:00

    I think the last object is being processed because the function may not be handling the Event Grid events correctly or is overwriting previous objects.

    Here is my idea to adjust your code :

    In the 1st part, all objects are added to a list events and sent in one call to outputevent.set(events) :

    import azure.functions as func
    import logging
    import datetime
    
    app = func.FunctionApp()
    
    @app.blob_trigger(arg_name="inputblob",
                    path="data/input/{name}",
                    connection="BlobStorageConnectionString")
    @app.event_grid_output(arg_name="outputevent",
                            topic_endpoint_uri="MyEventGridTopicUriSetting",
                            topic_key_setting="MyEventGridTopicKeySetting")
    def blob_trigger(inputblob: func.InputStream, outputevent: func.Out[func.EventGridOutputEvent]):
        logging.info(f"Python blob trigger function processed blob"
                    f"Name: {inputblob.name}"
                    f"Blob Size: {inputblob.length} bytes")      
         data_list = [{"label": "person", "name": "Ann", "id": "AAAA1234"}, {"label": "person", "name": "Sam", "id": "AAAA1235"}]      events = []     for dt in data_list:
            events.append(func.EventGridOutputEvent(
                id=dt["id"],
                data=dt,
                subject="test-subject",
                event_type="person",
                event_time=datetime.datetime.utcnow(),
                data_version="data_v1"))
        
        outputevent.set(events)
    
    

    In the 2nd part, the EventGridTrigger will log and execute the Gremlin insert query for each event as it retrieves the id and name from the data payload and constructs the Gremlin query :

    import logging
    import azure.functions as func
    import json
    from gremlin_python.driver import client, serializer
    
    app = func.FunctionApp()
    
    @app.event_grid_trigger(arg_name="eventgridobject")
    def EventGridTrigger(eventgridobject: func.EventGridEvent):
        try:
            data = eventgridobject.get_json()
            event_type = eventgridobject.event_type
            id = data['id']
            name = data['name']
    
            account_name = "AAAA"
            database_id = "BBBB"
            graph_id = "CCCC"
            primary_key = "DDDD"
    
            _gremlin_insert_vertices = f"g.addV('{event_type}').property('id', '{id}').property('name', '{name}').property('pk', 'pk')"
    
            gremlin_client = client.Client(
                                url=f"wss://{account_name}.gremlin.cosmos.azure.com:443/",
                                traversal_source="g",
                                username=f"/dbs/{database_id}/colls/{graph_id}",
                                password=f"{primary_key}",
                                message_serializer=serializer.GraphSONSerializersV2d0(),
                            )
    
            logging.info(f"Executing Gremlin query: {_gremlin_insert_vertices}")
            gremlin_client.submit(_gremlin_insert_vertices)
        except Exception as e:
            logging.error(f"Error executing Gremlin query: {e}")
    
    

0 additional answers

Sort by: Most helpful