Issue with Azure Orchestration Function

Redar Ismail Chicho 1 Reputation point
2024-07-23T03:09:47.8733333+00:00

I am building an Azure Orchestration Function App that has four different functions. The first function is a start task function that removes containers inside a Cosmos DB database. The second function, which I am having a problem with, scans a Cosmos DB container that consists of multiple links and extracts some of the links. Then, it saves the links inside a new Cosmos DB container. Inside the Orchestration Function, I am using a for-loop that runs this function 4 times with different parameters, in reality, I will have around 9 or more: I want the App takes A, B, C, D and finishes all. Now it is going back and forth. Thank you for your help

This is my Orch Function:

[Function(nameof(CosmosDBFunction))]

public async Task<string> RunOrchestrator(

    [OrchestrationTrigger] TaskOrchestrationContext context)

{

    ILogger logger = context.CreateReplaySafeLogger(nameof(CosmosDBFunction));

    logger.LogInformation("----Function Started-----");

    try

    {

        await context.CallActivityAsync("StartupTask");

    }

    catch (Exception ex)

    {



        logger.LogError($"Error Happend StartupTask: {ex.Message}");

    }

    



    var parallelTasks = new List<Task<string>>();

    var sites = new List<string>() {

        "A","B", "C", "D"

    };

   



    for (int i = 0; i < sites.Count; i++)

    {

        try

        {

            var task = context.CallActivityAsync<string>("ScanSafetoAContainer", sites[i]);

            parallelTasks.Add(task);

        }

        catch (TaskFailedException ex) when (ex.InnerException is CosmosException cosmosException)

        {



            _logger.LogError($"Error calling for site {sites[i]}: {ex.Message}");

           

        }

        catch (TaskFailedException ex)

        {

            _logger.LogError($"Error calling {sites[i]}: {ex.Message}");

            

        }



    }

    await Task.WhenAll(parallelTasks);



    context.CallActivityAsync("SaveToAContainer");

    try

    {

        logger.LogInformation("----Function B Started-----");

        await context.CallActivityAsync("SaveToXContainer");

    }

    catch (TaskFailedException ex)

    {

        logger.LogError($"Error Happend : {ex.Message}");

    }

    try

    {

        logger.LogInformation("----Function clinksToBlinks Started-----");

        await context.CallActivityAsync("SaveToYContainer");

    }

    catch (TaskFailedException ex) { 

        logger.LogError($"Error Happened: {ex.Message}");

    }

    logger.LogInformation("----Function Finished-----");

    try

    {

        logger.LogInformation($"Operation Is Completed");

        return "Operation Is Completed";

    }

    catch (Exception ex)

    {

        return $"Operation Is Completed: Exception at the End Line 

        {ex.Message}";



    }

    

}

This is the function that has been called inside the loop:

[Function(nameof(Something))]

[CosmosDBOutput(databaseName: "DB", containerName: "AB",

      Connection = "CosmosConnection"

     , CreateIfNotExists = true, PartitionKey = "/something")]

public async Task<string> SaveAToContainer

    ([ActivityTrigger] string site,

     FunctionContext executionContext)

{

    ConcurrentBag<Object> links = new ConcurrentBag<Object>();

    ILogger _logger = executionContext

        .GetLogger(nameof(Something));

    var container = await CreateContainer();

    var client = _httpClientFactory;

    client.Timeout = TimeSpan.FromSeconds(60);



    using FeedIterator<object> object= container.GetItemLinqQueryable<Page>()

        .Where(s => s.prop == prop)

        .ToFeedIterator<Object>();

    while (pages.HasMoreResults)

    {



        long counter = 0;

        foreach (var page in await pages.ReadNextAsync())

        {



         

            string href = string.Empty;

            try

            {

                HttpResponseMessage responseFromUrl = await client.GetAsync(page.Url);

                var htmlContent = await responseFromUrl.Content.ReadAsStringAsync();

                var doc = new HtmlDocument();

                doc.LoadHtml(htmlContent);



                HtmlNode divNode = doc.DocumentNode.SelectSingleNode("//div[@id='ga-maincontent']");

                HtmlNodeCollection anchorTags = doc.DocumentNode

               .SelectNodes("//div[@id='ga-maincontent']//a[@href]");

                if (anchorTags != null)

                {

                    foreach (HtmlNode anchorTag in anchorTags)

                    {

                        href = anchorTag.GetAttributeValue("href", string.Empty);

                        if (href.StartsWith("http"))

                        {

                            long uniqueId = Interlocked.Increment(ref counter);

                            links.Add(new Link

                            {

                                Id = $"{Guid.NewGuid().ToString()}{Interlocked.Increment(ref counter)}",

                                 PropA= something



                            });

                        }

                    }

                }

            }

           

            catch (Exception ex)

            {

                links.Add(new Link

                {

                    Id = $"{Guid.NewGuid().ToString()}{Interlocked.Increment(ref 			counter)}",

                    PropA= something



                });

                _logger.LogInformation($"Http Request Exception :" +

                    $" Messege :{ex.Message} | Status Code {ex.Source} | {ex.Source} ");

            }





            //if (i >= pages.) break;

            

        }



    }



    string scannedLinks = string.Empty;

    try

    {

        ss = JsonConvert.SerializeObject(links);

    }

    catch (Exception ex)

    {

        _logger.LogInformation($"Parallel Func Insert Links : {ex.Message} ");

    }



        return ss;

} 

I used to use the List instead of ConCurrentBag, but then I thought it is not thread safe why the apps is not executing as I am expecting. However, I feel that I am using async and await keywords nicely to make sure that the app run and waits for the tasks to complete.

What I really want to do is: when I loop through the task: A, B, C, and D will be completed and all be ready saved to cosmos db, and then we go to the third function. Any suggestion of what I am doing wrong is much appreciated.

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,661 questions
.NET
.NET
Microsoft Technologies based on the .NET software framework.
3,641 questions
Azure
Azure
A cloud computing platform and infrastructure for building, deploying and managing applications and services through a worldwide network of Microsoft-managed datacenters.
1,082 questions
C#
C#
An object-oriented and type-safe programming language that has its roots in the C family of languages and includes support for component-oriented programming.
10,640 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Sina Salam 7,286 Reputation points
    2024-07-23T16:22:26.49+00:00

    Hello Redar Ismail Chicho,

    Welcome to the Microsoft Q&A and thank you for posting your questions here and for lucid explanatory.

    Problem

    I understand that you would like to loop through the tasks A, B, C, and D completed and all be ready saved to cosmos db, and then we go to the third function.

    Solution

    After careful examining of your code and your desire output, the issue and some confusion with the asynchronous execution and the logging might be causing some delays. So, to ensure that tasks A, B, C, and D are completed before moving to the next function, you need to properly handle the orchestration and parallel execution of your tasks. I provided a reviewed code (refactor) version of your orchestration function with improved clarity:

    [Function(nameof(CosmosDBFunction))]
    public async Task<string> RunOrchestrator(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        ILogger logger = context.CreateReplaySafeLogger(nameof(CosmosDBFunction));
        logger.LogInformation("----Function Started-----");
        try
        {
            await context.CallActivityAsync("StartupTask");
        }
        catch (Exception ex)
        {
            logger.LogError($"Error Happened StartupTask: {ex.Message}");
        }
        var sites = new List<string> { "A", "B", "C", "D" };
        var parallelTasks = new List<Task<string>>();
        foreach (var site in sites)
        {
            try
            {
                var task = context.CallActivityAsync<string>("ScanSafetoAContainer", site);
                parallelTasks.Add(task);
            }
            catch (TaskFailedException ex) when (ex.InnerException is CosmosException)
            {
                logger.LogError($"Error calling for site {site}: {ex.Message}");
            }
            catch (TaskFailedException ex)
            {
                logger.LogError($"Error calling {site}: {ex.Message}");
            }
        }
        await Task.WhenAll(parallelTasks);
        try
        {
            await context.CallActivityAsync("SaveToAContainer");
        }
        catch (TaskFailedException ex)
        {
            logger.LogError($"Error happened in SaveToAContainer: {ex.Message}");
        }
        try
        {
            logger.LogInformation("----Function SaveToXContainer Started-----");
            await context.CallActivityAsync("SaveToXContainer");
        }
        catch (TaskFailedException ex)
        {
            logger.LogError($"Error happened in SaveToXContainer: {ex.Message}");
        }
        try
        {
            logger.LogInformation("----Function SaveToYContainer Started-----");
            await context.CallActivityAsync("SaveToYContainer");
        }
        catch (TaskFailedException ex)
        {
            logger.LogError($"Error happened in SaveToYContainer: {ex.Message}");
        }
        logger.LogInformation("----Function Finished-----");
        try
        {
            logger.LogInformation("Operation Is Completed");
            return "Operation Is Completed";
        }
        catch (Exception ex)
        {
            return $"Operation Is Completed: Exception at the End Line {ex.Message}";
        }
    }
    

    Also, for cleaned-up version of SaveAToContainer for better readability:

    [Function(nameof(SaveAToContainer))]
    [CosmosDBOutput(databaseName: "DB", containerName: "AB",
        Connection = "CosmosConnection", CreateIfNotExists = true, PartitionKey = "/something")]
    public async Task<string> SaveAToContainer(
        [ActivityTrigger] string site,
        FunctionContext executionContext)
    {
        var links = new ConcurrentBag<Object>();
        var logger = executionContext.GetLogger(nameof(SaveAToContainer));
        var container = await CreateContainer();
        var client = _httpClientFactory;
        client.Timeout = TimeSpan.FromSeconds(60);
        var pages = container.GetItemLinqQueryable<Page>()
            .Where(s => s.Prop == site)
            .ToFeedIterator<Object>();
        while (pages.HasMoreResults)
        {
            foreach (var page in await pages.ReadNextAsync())
            {
                try
                {
                    var response = await client.GetAsync(page.Url);
                    var htmlContent = await response.Content.ReadAsStringAsync();
                    var doc = new HtmlDocument();
                    doc.LoadHtml(htmlContent);
                    var anchorTags = doc.DocumentNode.SelectNodes("//div[@id='ga-maincontent']//a[@href]");
                    if (anchorTags != null)
                    {
                        foreach (var anchorTag in anchorTags)
                        {
                            var href = anchorTag.GetAttributeValue("href", string.Empty);
                            if (href.StartsWith("http"))
                            {
                                links.Add(new Link
                                {
                                    Id = $"{Guid.NewGuid()}{Interlocked.Increment(ref counter)}",
                                    PropA = site
                                });
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    links.Add(new Link
                    {
                        Id = $"{Guid.NewGuid()}{Interlocked.Increment(ref counter)}",
                        PropA = site
                    });
                    logger.LogError($"Http Request Exception: {ex.Message}");
                }
            }
        }
        return JsonConvert.SerializeObject(links);
    }
    

    NOTE:

    For the two code above, you will need to maintain the followings:

    In the Orchestration Function:

    • Make sure that all asynchronous calls are properly awaited, and that you're not missing any await keywords. This includes any methods that you call within your activity functions.
    • Logging inside loops and tasks can sometimes cause performance issues. Ensure logging is not excessively verbose, especially within loops.
    • Simplify your orchestration function to make it clearer and easier to debug.
    • Ensure that you are correctly managing concurrency and thread safety, especially with the use of ConcurrentBag.

    In the SaveAToContainer function:

    • Using ConcurrentBag is a good choice for thread safety. Ensure other operations that could be thread-sensitive are also thread-safe.
    • Make sure all asynchronous calls are properly awaited and handled.

    With the above you should be good! Tasks A, B, C, and D are completed before moving on to the next steps in your orchestration.

    Accept Answer

    I hope this is helpful! Do not hesitate to let me know if you have any other questions.

    ** Please don't forget to close up the thread here by upvoting and accept it as an answer if it is helpful ** so that others in the community facing similar issues can easily find the solution.

    Best Regards,

    Sina Salam

    0 comments No comments