Scaling Azure Durable Function - Fan-out/Fan-in Scenario

Shreekumar Suggamad 0 Reputation points
2023-11-06T06:40:57.2333333+00:00

How can I scale my Azure Durable Function Fan-out/fan-in scenario using .Net Core C#? I am running multiple functions concurrently without any aggregation, and I am hosting the Azure Function using Docker to Azure Container Registry and then to Azure Function over Linux App Service Plan.

My approach is:

  • Starter_Engine: Kafka Trigger - An orchestration Starter function. Daily I'll get one message to Kafka topic and this function is bound to that topic. Whenever message arrives, this function starts an orchestrator function.
  • OrchestrationFunc: An Orchestrator function that calls below Activity Functions
    • CreateSnapshot: This is the First Activity Function called inside OrchestrationFunc. There is no Input and Output for this function. Basically it creates a Database Snapshots by comparing data from Previous days to current day.
    • GetProductsListFunc: An activity function that returns a list of unique products from database (This count remains same for the day, next day it might change. Everyday this function returns unique different count).
    • ProcessFunc: An activity function that processes each product in parallel.

Here's my code

[FunctionName(FuncConstants.Starter_Engine)]
public async Task RunKafkaTrigger(
	[KafkaTrigger($"%{KafkaTriggerConstants.BootstrapServer}%",
      	$"%{KafkaTriggerConstants.Topic}%",
        Username = $"%{KafkaTriggerConstants.ApiKey}%",
        Password = $"%{KafkaTriggerConstants.ApiSecret}%",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain,
        ConsumerGroup = $"%{KafkaTriggerConstants.ConsumerGroup}%")]
        KafkaEventData<string> kafkaEvent,
        [DurableClient] IDurableClient starter,
        ILogger log)
	{
    	try
        {
        	var kafkaModel = JsonConvert.DeserializeObject<MyModel>(kafkaEvent.Value);
            if (kafkaModel != null)
            {
            	var instanceId = await starter.StartNewAsync(FuncConstants.OrchestrationFunc, null);
                log.LogInformation($"Started orchestration with ID = '{instanceId}'.");
          	}
      	}
        catch (Exception ex)
        {
        	log.LogError(ex, $"Error occurred while Initiating Starter Function");
       	}
 	}
}
[FunctionName(FuncConstants.OrchestrationOneWac)]
public async Task Run([OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
    log = context.CreateReplaySafeLogger(log);

    try
    {
         // Create Store & Depot Snapshots
         var isSnapshotCreated = await context.CallActivityAsync<bool>(FuncConstants.CreateSnapshot, null);
         if (!isSnapshotCreated)
         {
             return;
         }

         // Get all the unique "Products" to be processed
         var productList = await context.CallActivityAsync<List<Product>>(FuncConstants.GetProductsListFunc, null);
         if (productList.NullOrEmpty())
         {
            log.LogError($"No Products are available for the day");
            return;
         }

         // Process each "Product" in Parallel
         var parallelTasks = new List<Task>();

         foreach (var product in productList)
         {
             var task = context.CallActivityAsync(FuncConstants.ProcessFunc, product);
             parallelTasks.Add(task);
         }

         // Wait for all results to come back
  		 await Task.WhenAll(parallelTasks);

         log.LogInformation($"Orchestration has been completed Successfully");
   	}
	catch (Exception ex)
    {
    	log.LogError(ex, $"Error occurred in Orchestration");
    }
}

Here, GetProductsListFunc Function return List<String> and count will be approximately 300,000 each day. Basically, parallelTask List holds these many tasks. For concurrancy, I've below Configuration in host.json

"extensions": {
    "durableTask": {
      "maxConcurrentActivityFunctions": 20,
      "maxConcurrentOrchestratorFunctions": 3
    },
    "kafka": {
      "AutoOffsetReset": "latest"
	}
}

I observed that, my last function i.e. ProcessFunc is taking around 20 seconds to process the each product as does series of database activities for each product and it takes that much time. By considering this time and total number of tasks to be completed, this will not be done within a day. To achieve this, I wanted to Auto-Scale my Function App to run more number of Parallel functions and complete all these activities within a day.

Is my Approach correct? Anything to be considered when it comes to this kind of Orchestration?

Any suggestions or resources would be appreciated.

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
5,930 questions
Azure App Service
Azure App Service
Azure App Service is a service used to create and deploy scalable, mission-critical web apps.
8,962 questions
{count} votes

1 answer

Sort by: Most helpful
  1. MikeUrnun 9,777 Reputation points Moderator
    2023-11-14T03:12:57.2933333+00:00

    Hello @Shreekumar Suggamad - A few questions for ya:

    • With the way things are currently set up, do you observe free CPU/Memory that's not being utilized? If so, what is the %?
    • Assuming that you're running an isolated dotnet worker, do you have an optimal value set for the FUNCTIONS_WORKER_PROCESS_COUNT?
    • Looks like you're throttling concurrency at the per-worker level, are you also leveraging the instance caching and batching features?
    • Lastly, by rough calculation, ProcessFunc which alone takes 20 seconds, takes us to ~1667hrs for 300K records. For keeping it under 24hrs, could you clarify which Premium plan (EP1 vs EP2 vs EP3) you've opted for and any scale-out rules in place? Note that for Linux, some regions don't support max scale-out values.

    cc @David Justo for his insights & input on the use case


Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.