Issue with Azure Orchestration Function

Redar Ismail Chicho 1 Reputation point
2024-07-23T03:09:47.8733333+00:00

I am building an Azure Orchestration Function App that has four different functions. The first function is a start task function that removes containers inside a Cosmos DB database. The second function, which I am having a problem with, scans a Cosmos DB container that consists of multiple links and extracts some of the links. Then, it saves the links inside a new Cosmos DB container. Inside the Orchestration Function, I am using a for-loop that runs this function 4 times with different parameters, in reality, I will have around 9 or more: I want the App takes A, B, C, D and finishes all. Now it is going back and forth. Thank you for your help

This is my Orch Function:

[Function(nameof(CosmosDBFunction))]

public async Task
Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,095 questions
0 comments No comments
{count} votes

1 answer

Sort by: Most helpful
  1. Sina Salam 12,011 Reputation points
    2024-07-23T16:22:26.49+00:00

    Hello Redar Ismail Chicho,

    Welcome to the Microsoft Q&A and thank you for posting your questions here and for lucid explanatory.

    Problem

    I understand that you would like to loop through the tasks A, B, C, and D completed and all be ready saved to cosmos db, and then we go to the third function.

    Solution

    After careful examining of your code and your desire output, the issue and some confusion with the asynchronous execution and the logging might be causing some delays. So, to ensure that tasks A, B, C, and D are completed before moving to the next function, you need to properly handle the orchestration and parallel execution of your tasks. I provided a reviewed code (refactor) version of your orchestration function with improved clarity:

    [Function(nameof(CosmosDBFunction))]
    public async Task<string> RunOrchestrator(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        ILogger logger = context.CreateReplaySafeLogger(nameof(CosmosDBFunction));
        logger.LogInformation("----Function Started-----");
        try
        {
            await context.CallActivityAsync("StartupTask");
        }
        catch (Exception ex)
        {
            logger.LogError($"Error Happened StartupTask: {ex.Message}");
        }
        var sites = new List<string> { "A", "B", "C", "D" };
        var parallelTasks = new List<Task<string>>();
        foreach (var site in sites)
        {
            try
            {
                var task = context.CallActivityAsync<string>("ScanSafetoAContainer", site);
                parallelTasks.Add(task);
            }
            catch (TaskFailedException ex) when (ex.InnerException is CosmosException)
            {
                logger.LogError($"Error calling for site {site}: {ex.Message}");
            }
            catch (TaskFailedException ex)
            {
                logger.LogError($"Error calling {site}: {ex.Message}");
            }
        }
        await Task.WhenAll(parallelTasks);
        try
        {
            await context.CallActivityAsync("SaveToAContainer");
        }
        catch (TaskFailedException ex)
        {
            logger.LogError($"Error happened in SaveToAContainer: {ex.Message}");
        }
        try
        {
            logger.LogInformation("----Function SaveToXContainer Started-----");
            await context.CallActivityAsync("SaveToXContainer");
        }
        catch (TaskFailedException ex)
        {
            logger.LogError($"Error happened in SaveToXContainer: {ex.Message}");
        }
        try
        {
            logger.LogInformation("----Function SaveToYContainer Started-----");
            await context.CallActivityAsync("SaveToYContainer");
        }
        catch (TaskFailedException ex)
        {
            logger.LogError($"Error happened in SaveToYContainer: {ex.Message}");
        }
        logger.LogInformation("----Function Finished-----");
        try
        {
            logger.LogInformation("Operation Is Completed");
            return "Operation Is Completed";
        }
        catch (Exception ex)
        {
            return $"Operation Is Completed: Exception at the End Line {ex.Message}";
        }
    }
    

    Also, for cleaned-up version of SaveAToContainer for better readability:

    [Function(nameof(SaveAToContainer))]
    [CosmosDBOutput(databaseName: "DB", containerName: "AB",
        Connection = "CosmosConnection", CreateIfNotExists = true, PartitionKey = "/something")]
    public async Task<string> SaveAToContainer(
        [ActivityTrigger] string site,
        FunctionContext executionContext)
    {
        var links = new ConcurrentBag<Object>();
        var logger = executionContext.GetLogger(nameof(SaveAToContainer));
        var container = await CreateContainer();
        var client = _httpClientFactory;
        client.Timeout = TimeSpan.FromSeconds(60);
        var pages = container.GetItemLinqQueryable<Page>()
            .Where(s => s.Prop == site)
            .ToFeedIterator<Object>();
        while (pages.HasMoreResults)
        {
            foreach (var page in await pages.ReadNextAsync())
            {
                try
                {
                    var response = await client.GetAsync(page.Url);
                    var htmlContent = await response.Content.ReadAsStringAsync();
                    var doc = new HtmlDocument();
                    doc.LoadHtml(htmlContent);
                    var anchorTags = doc.DocumentNode.SelectNodes("//div[@id='ga-maincontent']//a[@href]");
                    if (anchorTags != null)
                    {
                        foreach (var anchorTag in anchorTags)
                        {
                            var href = anchorTag.GetAttributeValue("href", string.Empty);
                            if (href.StartsWith("http"))
                            {
                                links.Add(new Link
                                {
                                    Id = $"{Guid.NewGuid()}{Interlocked.Increment(ref counter)}",
                                    PropA = site
                                });
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    links.Add(new Link
                    {
                        Id = $"{Guid.NewGuid()}{Interlocked.Increment(ref counter)}",
                        PropA = site
                    });
                    logger.LogError($"Http Request Exception: {ex.Message}");
                }
            }
        }
        return JsonConvert.SerializeObject(links);
    }
    

    NOTE:

    For the two code above, you will need to maintain the followings:

    In the Orchestration Function:

    • Make sure that all asynchronous calls are properly awaited, and that you're not missing any await keywords. This includes any methods that you call within your activity functions.
    • Logging inside loops and tasks can sometimes cause performance issues. Ensure logging is not excessively verbose, especially within loops.
    • Simplify your orchestration function to make it clearer and easier to debug.
    • Ensure that you are correctly managing concurrency and thread safety, especially with the use of ConcurrentBag.

    In the SaveAToContainer function:

    • Using ConcurrentBag is a good choice for thread safety. Ensure other operations that could be thread-sensitive are also thread-safe.
    • Make sure all asynchronous calls are properly awaited and handled.

    With the above you should be good! Tasks A, B, C, and D are completed before moving on to the next steps in your orchestration.

    Accept Answer

    I hope this is helpful! Do not hesitate to let me know if you have any other questions.

    ** Please don't forget to close up the thread here by upvoting and accept it as an answer if it is helpful ** so that others in the community facing similar issues can easily find the solution.

    Best Regards,

    Sina Salam

    0 comments No comments

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.