How can I scale my Azure Durable Function Fan-out/fan-in scenario using .Net Core C#? I am running multiple functions concurrently without any aggregation, and I am hosting the Azure Function using Docker to Azure Container Registry and then to Azure Function over Linux App Service Plan.
My approach is:
-
Starter_Engine
: Kafka Trigger - An orchestration Starter function. Daily I'll get one message to Kafka topic and this function is bound to that topic. Whenever message arrives, this function starts an orchestrator function.
-
OrchestrationFunc
: An Orchestrator function that calls below Activity Functions
-
CreateSnapshot
: This is the First Activity Function called inside OrchestrationFunc
. There is no Input and Output for this function. Basically it creates a Database Snapshots by comparing data from Previous days to current day.
-
GetProductsListFunc
: An activity function that returns a list of unique products from database (This count remains same for the day, next day it might change. Everyday this function returns unique different count).
-
ProcessFunc
: An activity function that processes each product in parallel.
Here's my code
[FunctionName(FuncConstants.Starter_Engine)]
public async Task RunKafkaTrigger(
[KafkaTrigger($"%{KafkaTriggerConstants.BootstrapServer}%",
$"%{KafkaTriggerConstants.Topic}%",
Username = $"%{KafkaTriggerConstants.ApiKey}%",
Password = $"%{KafkaTriggerConstants.ApiSecret}%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = $"%{KafkaTriggerConstants.ConsumerGroup}%")]
KafkaEventData<string> kafkaEvent,
[DurableClient] IDurableClient starter,
ILogger log)
{
try
{
var kafkaModel = JsonConvert.DeserializeObject<MyModel>(kafkaEvent.Value);
if (kafkaModel != null)
{
var instanceId = await starter.StartNewAsync(FuncConstants.OrchestrationFunc, null);
log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
}
}
catch (Exception ex)
{
log.LogError(ex, $"Error occurred while Initiating Starter Function");
}
}
}
[FunctionName(FuncConstants.OrchestrationOneWac)]
public async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
log = context.CreateReplaySafeLogger(log);
try
{
// Create Store & Depot Snapshots
var isSnapshotCreated = await context.CallActivityAsync<bool>(FuncConstants.CreateSnapshot, null);
if (!isSnapshotCreated)
{
return;
}
// Get all the unique "Products" to be processed
var productList = await context.CallActivityAsync<List<Product>>(FuncConstants.GetProductsListFunc, null);
if (productList.NullOrEmpty())
{
log.LogError($"No Products are available for the day");
return;
}
// Process each "Product" in Parallel
var parallelTasks = new List<Task>();
foreach (var product in productList)
{
var task = context.CallActivityAsync(FuncConstants.ProcessFunc, product);
parallelTasks.Add(task);
}
// Wait for all results to come back
await Task.WhenAll(parallelTasks);
log.LogInformation($"Orchestration has been completed Successfully");
}
catch (Exception ex)
{
log.LogError(ex, $"Error occurred in Orchestration");
}
}
Here, GetProductsListFunc
Function return List<String>
and count will be approximately 300,000 each day. Basically, parallelTask
List holds these many tasks. For concurrancy, I've below Configuration in host.json
"extensions": {
"durableTask": {
"maxConcurrentActivityFunctions": 20,
"maxConcurrentOrchestratorFunctions": 3
},
"kafka": {
"AutoOffsetReset": "latest"
}
}
I observed that, my last function i.e. ProcessFunc
is taking around 20 seconds to process the each product as does series of database activities for each product and it takes that much time. By considering this time and total number of tasks to be completed, this will not be done within a day
. To achieve this, I wanted to Auto-Scale
my Function App to run more number of Parallel functions and complete all these activities within a day.
Is my Approach correct? Anything to be considered when it comes to this kind of Orchestration?
Any suggestions or resources would be appreciated.