Azure functions keep on running, inserting 10x values in Database

Mohammad, Khaja Moinuddin 41 Reputation points
2022-10-23T04:13:21.483+00:00

I have built a pipeline with Stream Analytics data triggering Azure Functions.

There are 5000 values merged in a single data. I wrote a simple python program in the Function to validate the data, parse the bulk data, and save it in Cosmos DB as an individual document. But the problem is, my functions don't stop. After 30 minutes I can see that my function generated an error saying timed out. And in these 30 minutes, I can see more than 300k values in my database which are duplicating themselves. I thought this problem is with my code (for loop) and I tried running it locally, and everything works. I am not sure why this is the problem. In the whole code, the only statement, I am unable to understand is in container.upsert line.

This is my code:

import logging  
import azure.functions as func  
import hashlib as h  
from azure.cosmos import CosmosClient  
import random, string  
  
def generateRandomID(length):  
    # choose from all lowercase letter  
    letters = string.ascii_lowercase  
    result_str = ''.join(random.choice(letters) for i in range(length))  
    return result_str  
  
  
URL = dburl  
KEY = dbkey  
client = CosmosClient(URL, credential=KEY)  
  
DATABASE_NAME = dbname  
database = client.get_database_client(DATABASE_NAME)  
CONTAINER_NAME = containername   
container = database.get_container_client(CONTAINER_NAME)  
  
  
def main(req: func.HttpRequest) -> func.HttpResponse:  
    logging.info('Python HTTP trigger function processed a request.')  
    req_body = req.get_json()  
  
    try:  
        #Level 1  
        rawMsg = req_body[0]  
        filteredMsg = rawMsg['message']  
        metaData = rawMsg['metaData']  
        logging.info(metaData)  
  
        encodeMD5 = filteredMsg.encode('utf-8')  
        generateMD5 = h.md5(encodeMD5).hexdigest()  
  
        parsingMetaData = metaData.split(',')  
        parsingMD5Hex =  parsingMetaData[3]  
        splitingHex = parsingMD5Hex.split(':')  
        parsingMD5Value = splitingHex[1]  
  
    except:  
        logging.info("Failed to parse the Data and Generate MD5 Checksums. Error at the level 1")  
  
  
    finally:  
        logging.info("Execution Successful | First level Completed ")  
        #return func.HttpResponse(f"OK")  
  
  
    try:  
        #Level 2:  
        if generateMD5 == parsingMD5Value:  
        #parsing the ecg values  
            logging.info('MD5 Checksums matched!')  
            splitValues = filteredMsg.split(',')  
            for eachValue in range(len(splitValues)):  
                ecgRawData = splitValues[eachValue]  
                divideEachValue = ecgRawData.split(':')  
                timeData = divideEachValue[0]  
                ecgData = divideEachValue[1]  
                container.upsert_item({ 'id': generateRandomID(10), 'time': timeData, 'ecgData': ecgData})  
  
        elif generateMD5 != parsingMD5Hex:  
            logging.info('The MD5s did not matched and couldnt execute the code properly')  
            logging.info(generateMD5)  
  
        else:  
            logging.info('Something is going wrong. Please check.')  
  
    except:  
        logging.info("Failed to parse ECG Values into the DB Container. Error ar the level 2")  
  
    finally:  
        logging.info("Execution Successful | Second level complete ")  
        #return func.HttpResponse(f"OK")  
          
  
    # Return a 200 status  
    return func.HttpResponse(f"OK")  

A test I performed: Commented on the for loop block and deployed the Function, it executes normally without any error.

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,889 questions
Azure Stream Analytics
Azure Stream Analytics
An Azure real-time analytics service designed for mission-critical workloads.
351 questions
{count} votes

Accepted answer
  1. MughundhanRaveendran-MSFT 12,476 Reputation points
    2022-11-02T08:56:52.523+00:00

    Hi @Mohammad, Khaja Moinuddin ,

    The container.upsert line upserts an item as an asynchronous operation in the Azure Cosmos service. From your scenario, it is possible that the cosmos DB is being throttled. I have seen this kind of issue and increasing the throughput at the Cosmos DB container has resolved the issue. So I would suggest you increase the throughput at the cosmos DB container and see if that helps.

    https://learn.microsoft.com/en-us/azure/cosmos-db/set-throughput#set-throughput-on-a-database-and-a-container

    Hope this helps! Feel free to reach out to me if you have any questions or concerns.


1 additional answer

Sort by: Most helpful
  1. Mohammad, Khaja Moinuddin 41 Reputation points
    2022-11-04T18:28:01.61+00:00

    I found the solution! (I am the OP)

    In my resource group, an App service plan is enabled for a Web application. So, when creating an Azure Function, it doesn't let me deploy it in the Serverless option. So, I deployed with the same app service plan used for Web applications. And while testing, the function completely works except for the container.upsert line. When I add this line, it fails to stop and creates 10x values in the database until it gets stopped by a timeout error beyond 30 minutes.

    I tried creating an App Service plan dedicated to this Function. But the issue is still the same.

    And while testing with 100s of corner case scenarios, I found out that my function runs perfectly when I deploy it in the other resource group. The only catch is, I have opted for the Serverless option while deploying the Functions.

    (If you are using an App service plan in your Azure Resource Group, you cannot deploy Azure Functions with a Serverless option. It shows the deployment is not proper. You need to create a dedicated app service plan for that function or you should use the existing App service plan)

    As per my research, when dealing with bulk data and inserting those data into the database, the usual app service plan doesn't work. The App Service Plan should be large enough to sustain the load. Or you should choose the Serverless option while deploying the Function, as the compute is totally managed by Azure.

    Hope this helps.

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.