I have created Azure Durable Functions
to read a large volume of data from Azure CosmosDB
. It consists of performing some Actions and storing the result back in a new Container of CosmosDB. I'm following Fan-Out/Fan-In Approach as shown below.
- HTTP request (HttpTrigger Function) will be triggered by the client
via the Function URL and In turn, it calls an Orchestration
(OrchestrationTrigger Function
)
- The Orchestration calls an
Activity Function (GetStoresToBeProcessed)
to get all the Distinct Stores to be Processed.
- The Orchestration makes multiple parallel function calls to
Activity Function(DirectToStoreWac)
to process all the transactional data of each Store
- Waits for all Calculations to complete
This approach is working fine. I need suggestions for the below points:
Activity Function (GetStoresToBeProcessed)
returns 3000+ stores back to Orchestration Function
and from there 3000+ tasks will be created and awaited. Is there any way I can batch 3000+ stores? Instead of sending 3000+ tasks in one go here.
// Get all the Stores to be Processed.
int[] stores = await context.CallActivityAsync<int[ (FuncConstants.GetStoresToBeProcessed, null);
var parallelTasks = new List<Task<int>>();
foreach (var store in stores)
{
Task<int> task = context.CallActivityAsync<int>(FuncConstants.DirectToStoreWac, store);
parallelTasks.Add(task);
}
//wait for all results to come back
await Task.WhenAll(parallelTasks);
Each Activity Function (GetStoresToBeProcessed)
will operate on Single Store's data (approximately 80-90K records) and internally the _calculationService.Calculate(storeId)
functioncalls the Cosmos Bulk
Feature await _wacDirectToStoreCosmosRepository.AddBulkAsync(list)
Here, is it correct approach to call Task.Await(cosmos bulk tasks of a store)
inside Task.Await(3000 stores call)
[FunctionName(FuncConstants.DirectToStoreWac)]
public async Task<int> RunAsync([ActivityTrigger] int storeId, ILogger log)
{
log.LogInformation($"The Store ({storeId}) has been Queued for Processing at {DateTime.UtcNow}");
int totalRecordsProcessed = 0;
await _calculationService.Calculate(storeId).ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
totalRecordsProcessed += t.Result;
}
else
{
log.LogError(t.Exception, $"[{FuncConstants.DirectToStoreWac}] - Error occured while processing Function");
}
});
return totalRecordsProcessed;
}