Rediger

Del via


Fan-out/fan-in scenario

Fan-out/fan-in runs multiple activities in parallel and then aggregates the results. This article shows how to implement the pattern by using the Durable Task SDKs for .NET, JavaScript, Python, and Java.

Scenario overview

In this sample, the functions upload all files under a specified directory (recursively) to blob storage. They also count the total number of bytes uploaded.

A single function can handle everything, but it doesn't scale. A single function execution runs on one virtual machine (VM), so throughput is limited to that VM. Reliability is another concern. If the process fails midway through, or takes more than five minutes, the backup can end in a partially completed state. Then you restart the backup.

A more robust approach is to use two separate functions: one enumerates the files and adds file names to a queue, and the other reads from the queue and uploads the files to blob storage. This approach improves throughput and reliability, but you need to set up and manage the queue. More importantly, this approach adds complexity for state management and coordination, like reporting the total number of bytes uploaded.

Durable Functions provides all these benefits with little overhead.

In the following example, the orchestrator processes multiple work items in parallel and then aggregates the results. This pattern is useful when you need to:

  • Process a batch of items where each item can be processed independently
  • Distribute work across multiple machines for better throughput
  • Aggregate results from all parallel operations

Without the fan-out/fan-in pattern, you either process items sequentially, which limits throughput, or you manage your own queuing and coordination logic, which adds complexity.

The Durable Task SDKs handle parallelization and coordination, so the pattern is simple to implement.

The functions

This article describes the functions in the sample app:

  • E2_BackupSiteContent: An orchestrator function that calls E2_GetFileList to get a list of files to back up, and then calls E2_CopyFileToBlob for each file.
  • E2_GetFileList: An activity function that returns a list of files in a directory.
  • E2_CopyFileToBlob: An activity function that backs up a single file to Azure Blob Storage.

This article describes the components in the example code:

  • ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator, or FanOutFanIn_WordCount: An orchestrator that fans out work to multiple activities in parallel, waits for all activities to complete, and then fans in by aggregating the results.
  • ProcessWorkItemActivity, processWorkItem, process_work_item, or CountWords: An activity that processes a single work item.
  • AggregateResultsActivity, aggregateResults, or aggregate_results: An activity that aggregates results from all parallel operations.

Orchestrator

This orchestrator function does the following:

  1. Takes rootDirectory as input.
  2. Calls a function to get a recursive list of files under rootDirectory.
  3. Makes parallel function calls to upload each file to Azure Blob Storage.
  4. Waits for all uploads to complete.
  5. Returns the total number of bytes uploaded to Azure Blob Storage.

Here is the code that implements the orchestrator function:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Notice the await Task.WhenAll(tasks); line. The code doesn't await the individual calls to E2_CopyFileToBlob, so they run in parallel. When the orchestrator passes the task array to Task.WhenAll, it returns a task that doesn't complete until all copy operations complete. If you're familiar with the Task Parallel Library (TPL) in .NET, this pattern is familiar. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.

After the orchestrator awaits Task.WhenAll, all function calls are complete and return values. Each call to E2_CopyFileToBlob returns the number of bytes uploaded. Calculate the total by adding the return values.

The orchestrator does the following:

  1. Takes a list of work items as input.
  2. Fans out by creating a task for each work item and processing them in parallel.
  3. Waits for all parallel tasks to complete.
  4. Fans in by aggregating the results.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

Use Task.WhenAll() to wait for all parallel tasks to complete. The Durable Task SDK ensures that the tasks can run on multiple machines concurrently, and the execution is resilient to process restarts.

Activities

The helper activity functions are regular functions that use the activityTrigger binding.

E2_GetFileList activity function

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Note

Don't put this code in the orchestrator function. Orchestrator functions shouldn't do I/O, including local file system access. For more information, see Orchestrator function code constraints.

E2_CopyFileToBlob activity function

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Note

To run the sample code, install the Microsoft.Azure.WebJobs.Extensions.Storage NuGet package.

The function uses Azure Functions binding features like the Binder parameter. You don't need those details for this walkthrough.

The implementation loads the file from disk and asynchronously streams the contents into a blob of the same name in the backups container. The function returns the number of bytes copied to storage. The orchestrator uses that value to compute the aggregate sum.

Note

This example moves I/O operations into an activityTrigger function. The work can run across multiple machines and supports progress checkpointing. If the host process ends, you know which uploads are complete.

Activities do the work. Unlike orchestrators, activities can perform I/O operations and nondeterministic logic.

Process work item activity

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (this is where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

Aggregate results activity

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

Run the sample

Start the orchestration on Windows by sending the following HTTP POST request:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Alternatively, on a Linux function app, start the orchestration by sending the following HTTP POST request. Python currently runs on Linux for App Service:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Note

The HttpStart function expects JSON. Include the Content-Type: application/json header, and encode the directory path as a JSON string. The HTTP snippet assumes host.json has an entry that removes the default api/ prefix from all HTTP trigger function URLs. Find the markup for this configuration in the sample host.json file.

This HTTP request triggers the E2_BackupSiteContent orchestrator and passes the string D:\home\LogFiles as a parameter. The response has a link to check the status of the backup operation:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Depending on the number of log files in your function app, this operation can take several minutes to finish. Get the latest status by querying the URL in the Location header of the previous HTTP 202 response:

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

In this case, the function is still running. The response shows the input saved in the orchestrator state and the last updated time. Use the Location header value to poll for completion. When the status is "Completed", the response resembles the following example:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

The response shows that the orchestration is complete and the approximate time to finish. The output field indicates that the orchestration uploaded about 450 KB of logs.

To run the example:

  1. Start the Durable Task Scheduler emulator for local development.

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Start the worker to register the orchestrator and activities.

  3. Run the client to schedule an orchestration with a list of work items:

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

Next steps

This sample shows the fan-out/fan-in pattern. The next sample shows how to implement the monitor pattern with durable timers.

This article demonstrates the fan-out/fan-in pattern. Explore more patterns and features:

For JavaScript SDK examples, see the Durable Task JavaScript SDK samples.