Azure Durable Fan-out/Fan-In Scenario with Cosmos Bulk Feature

Shreekumar Suggamad 136 Reputation points
2022-09-02T08:45:29.887+00:00

I have created Azure Durable Functions to read a large volume of data from Azure CosmosDB. It consists of performing some Actions and storing the result back in a new Container of CosmosDB. I'm following Fan-Out/Fan-In Approach as shown below.

  1. HTTP request (HttpTrigger Function) will be triggered by the client
    via the Function URL and In turn, it calls an Orchestration
    (OrchestrationTrigger Function)
  2. The Orchestration calls an Activity Function (GetStoresToBeProcessed) to get all the Distinct Stores to be Processed.
  3. The Orchestration makes multiple parallel function calls to Activity Function(DirectToStoreWac) to process all the transactional data of each Store
  4. Waits for all Calculations to complete

d9m32.png

This approach is working fine. I need suggestions for the below points:

Activity Function (GetStoresToBeProcessed) returns 3000+ stores back to Orchestration Function and from there 3000+ tasks will be created and awaited. Is there any way I can batch 3000+ stores? Instead of sending 3000+ tasks in one go here.

// Get all the Stores to be Processed.  
int[] stores = await context.CallActivityAsync<int[ (FuncConstants.GetStoresToBeProcessed, null);  
var parallelTasks = new List<Task<int>>();  
foreach (var store in stores)  
{  
    Task<int> task = context.CallActivityAsync<int>(FuncConstants.DirectToStoreWac, store);  
    parallelTasks.Add(task);  
}  
//wait for all results to come back  
await Task.WhenAll(parallelTasks);  

Each Activity Function (GetStoresToBeProcessed) will operate on Single Store's data (approximately 80-90K records) and internally the _calculationService.Calculate(storeId) functioncalls the Cosmos Bulk Feature await _wacDirectToStoreCosmosRepository.AddBulkAsync(list)

Here, is it correct approach to call Task.Await(cosmos bulk tasks of a store) inside Task.Await(3000 stores call)

[FunctionName(FuncConstants.DirectToStoreWac)]  
public async Task<int> RunAsync([ActivityTrigger] int storeId, ILogger log)  
{  
    log.LogInformation($"The Store ({storeId}) has been Queued for Processing at {DateTime.UtcNow}");  
  
    int totalRecordsProcessed = 0;  
    await _calculationService.Calculate(storeId).ContinueWith(t =>  
    {  
        if (t.IsCompletedSuccessfully)  
        {  
            totalRecordsProcessed += t.Result;  
        }  
        else  
        {  
            log.LogError(t.Exception, $"[{FuncConstants.DirectToStoreWac}] - Error occured while processing Function");  
        }  
   });  
  
    return totalRecordsProcessed;  
 }  
Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,679 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,543 questions
{count} votes

1 answer

Sort by: Most helpful
  1. MughundhanRaveendran-MSFT 12,456 Reputation points
    2022-09-26T12:18:50.917+00:00

    Hi @Shreekumar Suggamad ,

    Thanks for reaching out to Q&A.

    Here are the answers to your queries.

    Activity Function (GetStoresToBeProcessed) returns 3000+ stores back to Orchestration Function and from there 3000+ tasks will be created and awaited. Is there any way I can batch 3000+ stores? Instead of sending 3000+ tasks in one go here.

    Yes, there are some tricks in C# you can do to process these in smaller batches, often referred to as "chunking". This blog post has some nice examples of how it can be done. This is just normal C# and isn't specific to Durable Functions.

    Is it correct approach to call Task.Await(cosmos bulk tasks of a store) inside Task.Await(3000 stores call)?

    Yes, this is perfectly fine. It's often better to do concurrent work inside activity functions like this when possible because there is very little overhead involved, unlike when fanning out in orchestrator functions, which involves a lot of Azure Storage I/O.

    Hope this helps! Feel free to reach out to me if you have any queries or concerns.

    Please 'Accept as answer' and ‘Upvote’ if it helped so that it can help others in the community looking for help on similar topics.

    0 comments No comments