Hello Redar Ismail Chicho,
Welcome to the Microsoft Q&A and thank you for posting your questions here and for lucid explanatory.
Problem
I understand that you would like to loop through the tasks A, B, C, and D completed and all be ready saved to cosmos db, and then we go to the third function.
Solution
After careful examining of your code and your desire output, the issue and some confusion with the asynchronous execution and the logging might be causing some delays. So, to ensure that tasks A, B, C, and D are completed before moving to the next function, you need to properly handle the orchestration and parallel execution of your tasks. I provided a reviewed code (refactor) version of your orchestration function with improved clarity:
[Function(nameof(CosmosDBFunction))]
public async Task<string> RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
ILogger logger = context.CreateReplaySafeLogger(nameof(CosmosDBFunction));
logger.LogInformation("----Function Started-----");
try
{
await context.CallActivityAsync("StartupTask");
}
catch (Exception ex)
{
logger.LogError($"Error Happened StartupTask: {ex.Message}");
}
var sites = new List<string> { "A", "B", "C", "D" };
var parallelTasks = new List<Task<string>>();
foreach (var site in sites)
{
try
{
var task = context.CallActivityAsync<string>("ScanSafetoAContainer", site);
parallelTasks.Add(task);
}
catch (TaskFailedException ex) when (ex.InnerException is CosmosException)
{
logger.LogError($"Error calling for site {site}: {ex.Message}");
}
catch (TaskFailedException ex)
{
logger.LogError($"Error calling {site}: {ex.Message}");
}
}
await Task.WhenAll(parallelTasks);
try
{
await context.CallActivityAsync("SaveToAContainer");
}
catch (TaskFailedException ex)
{
logger.LogError($"Error happened in SaveToAContainer: {ex.Message}");
}
try
{
logger.LogInformation("----Function SaveToXContainer Started-----");
await context.CallActivityAsync("SaveToXContainer");
}
catch (TaskFailedException ex)
{
logger.LogError($"Error happened in SaveToXContainer: {ex.Message}");
}
try
{
logger.LogInformation("----Function SaveToYContainer Started-----");
await context.CallActivityAsync("SaveToYContainer");
}
catch (TaskFailedException ex)
{
logger.LogError($"Error happened in SaveToYContainer: {ex.Message}");
}
logger.LogInformation("----Function Finished-----");
try
{
logger.LogInformation("Operation Is Completed");
return "Operation Is Completed";
}
catch (Exception ex)
{
return $"Operation Is Completed: Exception at the End Line {ex.Message}";
}
}
Also, for cleaned-up version of SaveAToContainer
for better readability:
[Function(nameof(SaveAToContainer))]
[CosmosDBOutput(databaseName: "DB", containerName: "AB",
Connection = "CosmosConnection", CreateIfNotExists = true, PartitionKey = "/something")]
public async Task<string> SaveAToContainer(
[ActivityTrigger] string site,
FunctionContext executionContext)
{
var links = new ConcurrentBag<Object>();
var logger = executionContext.GetLogger(nameof(SaveAToContainer));
var container = await CreateContainer();
var client = _httpClientFactory;
client.Timeout = TimeSpan.FromSeconds(60);
var pages = container.GetItemLinqQueryable<Page>()
.Where(s => s.Prop == site)
.ToFeedIterator<Object>();
while (pages.HasMoreResults)
{
foreach (var page in await pages.ReadNextAsync())
{
try
{
var response = await client.GetAsync(page.Url);
var htmlContent = await response.Content.ReadAsStringAsync();
var doc = new HtmlDocument();
doc.LoadHtml(htmlContent);
var anchorTags = doc.DocumentNode.SelectNodes("//div[@id='ga-maincontent']//a[@href]");
if (anchorTags != null)
{
foreach (var anchorTag in anchorTags)
{
var href = anchorTag.GetAttributeValue("href", string.Empty);
if (href.StartsWith("http"))
{
links.Add(new Link
{
Id = $"{Guid.NewGuid()}{Interlocked.Increment(ref counter)}",
PropA = site
});
}
}
}
}
catch (Exception ex)
{
links.Add(new Link
{
Id = $"{Guid.NewGuid()}{Interlocked.Increment(ref counter)}",
PropA = site
});
logger.LogError($"Http Request Exception: {ex.Message}");
}
}
}
return JsonConvert.SerializeObject(links);
}
NOTE:
For the two code above, you will need to maintain the followings:
In the Orchestration Function:
- Make sure that all asynchronous calls are properly awaited, and that you're not missing any
await
keywords. This includes any methods that you call within your activity functions. - Logging inside loops and tasks can sometimes cause performance issues. Ensure logging is not excessively verbose, especially within loops.
- Simplify your orchestration function to make it clearer and easier to debug.
- Ensure that you are correctly managing concurrency and thread safety, especially with the use of
ConcurrentBag
.
In the SaveAToContainer
function:
- Using
ConcurrentBag
is a good choice for thread safety. Ensure other operations that could be thread-sensitive are also thread-safe. - Make sure all asynchronous calls are properly awaited and handled.
With the above you should be good! Tasks A, B, C, and D are completed before moving on to the next steps in your orchestration.
Accept Answer
I hope this is helpful! Do not hesitate to let me know if you have any other questions.
** Please don't forget to close up the thread here by upvoting and accept it as an answer if it is helpful ** so that others in the community facing similar issues can easily find the solution.
Best Regards,
Sina Salam