Fan-out/fan-in runs multiple functions in parallel and then aggregates the results. This article shows an example that uses Durable Functions to back up some or all of an app's site content to Azure Storage.
Prerequisites
V3 programming model
V4 programming model
Fan-out/fan-in runs multiple activities in parallel and then aggregates the results. This article shows how to implement the pattern by using the Durable Task SDKs for .NET, JavaScript, Python, and Java.
Scenario overview
In this sample, the functions upload all files under a specified directory (recursively) to blob storage. They also count the total number of bytes uploaded.
A single function can handle everything, but it doesn't scale. A single function execution runs on one virtual machine (VM), so throughput is limited to that VM. Reliability is another concern. If the process fails midway through, or takes more than five minutes, the backup can end in a partially completed state. Then you restart the backup.
A more robust approach is to use two separate functions: one enumerates the files and adds file names to a queue, and the other reads from the queue and uploads the files to blob storage. This approach improves throughput and reliability, but you need to set up and manage the queue. More importantly, this approach adds complexity for state management and coordination, like reporting the total number of bytes uploaded.
Durable Functions provides all these benefits with little overhead.
In the following example, the orchestrator processes multiple work items in parallel and then aggregates the results. This pattern is useful when you need to:
- Process a batch of items where each item can be processed independently
- Distribute work across multiple machines for better throughput
- Aggregate results from all parallel operations
Without the fan-out/fan-in pattern, you either process items sequentially, which limits throughput, or you manage your own queuing and coordination logic, which adds complexity.
The Durable Task SDKs handle parallelization and coordination, so the pattern is simple to implement.
The functions
This article describes the functions in the sample app:
E2_BackupSiteContent: An orchestrator function that calls E2_GetFileList to get a list of files to back up, and then calls E2_CopyFileToBlob for each file.
E2_GetFileList: An activity function that returns a list of files in a directory.
E2_CopyFileToBlob: An activity function that backs up a single file to Azure Blob Storage.
This article describes the components in the example code:
ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator, or FanOutFanIn_WordCount: An orchestrator that fans out work to multiple activities in parallel, waits for all activities to complete, and then fans in by aggregating the results.
ProcessWorkItemActivity, processWorkItem, process_work_item, or CountWords: An activity that processes a single work item.
AggregateResultsActivity, aggregateResults, or aggregate_results: An activity that aggregates results from all parallel operations.
Orchestrator
This orchestrator function does the following:
- Takes
rootDirectory as input.
- Calls a function to get a recursive list of files under
rootDirectory.
- Makes parallel function calls to upload each file to Azure Blob Storage.
- Waits for all uploads to complete.
- Returns the total number of bytes uploaded to Azure Blob Storage.
Here is the code that implements the orchestrator function:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Notice the await Task.WhenAll(tasks); line. The code doesn't await the individual calls to E2_CopyFileToBlob, so they run in parallel. When the orchestrator passes the task array to Task.WhenAll, it returns a task that doesn't complete until all copy operations complete. If you're familiar with the Task Parallel Library (TPL) in .NET, this pattern is familiar. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
After the orchestrator awaits Task.WhenAll, all function calls are complete and return values. Each call to E2_CopyFileToBlob returns the number of bytes uploaded. Calculate the total by adding the return values.
V3 programming model
The function uses the standard function.json for orchestrator functions.
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
Here is the code that implements the orchestrator function:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Notice the yield context.df.Task.all(tasks); line. The code doesn't yield the individual calls to E2_CopyFileToBlob, so they run in parallel. When the orchestrator passes the task array to context.df.Task.all, it returns a task that doesn't complete until all copy operations complete. If you're familiar with Promise.all in JavaScript, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
Note
Although tasks are conceptually similar to JavaScript promises, orchestrator functions should use context.df.Task.all and context.df.Task.any instead of Promise.all and Promise.race to manage task parallelization.
After the orchestrator yields context.df.Task.all, all function calls are complete and return values. Each call to E2_CopyFileToBlob returns the number of bytes uploaded, so calculating the sum total byte count is a matter of adding all those return values together.
V4 programming model
Here is the code that implements the orchestrator function:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace(/\\/g, "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Notice the yield context.df.Task.all(tasks); line. All the individual calls to the copyFileToBlob function were not yielded, which allows them to run in parallel. When we pass this array of tasks to context.df.Task.all, we get back a task that won't complete until all the copy operations have completed. If you're familiar with Promise.all in JavaScript, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
Note
Although tasks are conceptually similar to JavaScript promises, orchestrator functions should use context.df.Task.all and context.df.Task.any instead of Promise.all and Promise.race to manage task parallelization.
After yielding from context.df.Task.all, we know that all function calls have completed and have returned values back to us. Each call to copyFileToBlob returns the number of bytes uploaded, so calculating the sum total byte count is a matter of adding all those return values together.
The function uses the standard function.json for orchestrator functions.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
Here is the code that implements the orchestrator function:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Notice the yield context.task_all(tasks); line. The code doesn't yield the individual calls to E2_CopyFileToBlob, so they run in parallel. When the orchestrator passes the task array to context.task_all, it returns a task that doesn't complete until all copy operations complete. If you're familiar with asyncio.gather in Python, then this is not new to you. The difference is that these tasks could be running on multiple virtual machines concurrently, and the Durable Functions extension ensures that the end-to-end execution is resilient to process recycling.
Note
Although tasks are conceptually similar to Python awaitables, orchestrator functions should use yield as well as the context.task_all and context.task_any APIs to manage task parallelization.
After the orchestrator yields context.task_all, all function calls are complete and return values. Each call to E2_CopyFileToBlob returns the number of bytes uploaded, so we can calculate the sum total byte count by adding all the return values together.
A PowerShell sample isn't available yet.
A Java sample isn't available yet.
The orchestrator does the following:
- Takes a list of work items as input.
- Fans out by creating a task for each work item and processing them in parallel.
- Waits for all parallel tasks to complete.
- Fans in by aggregating the results.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
Use Task.WhenAll() to wait for all parallel tasks to complete. The Durable Task SDK ensures that the tasks can run on multiple machines concurrently, and the execution is resilient to process restarts.
import {
OrchestrationContext,
TOrchestrator,
whenAll,
} from "@microsoft/durabletask-js";
const fanOutFanInOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
workItems: string[]
): any {
// Fan-out: create a task for each work item in parallel
const tasks = workItems.map((item) => ctx.callActivity(processWorkItem, item));
// Wait for all parallel tasks to complete
const results: number[] = yield whenAll(tasks);
// Fan-in: aggregate all results
const aggregatedResult = yield ctx.callActivity(aggregateResults, results);
return aggregatedResult;
};
Use whenAll() to wait for all parallel tasks to complete. The Durable Task SDK ensures that the tasks can run on multiple machines concurrently, and the execution is resilient to process restarts.
from durabletask import task
def fan_out_fan_in_orchestrator(ctx: task.OrchestrationContext, work_items: list) -> dict:
"""Orchestrator that demonstrates fan-out/fan-in pattern."""
# Fan-out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity(process_work_item, input=item))
# Wait for all tasks to complete
results = yield task.when_all(parallel_tasks)
# Fan-in: Aggregate all the results
final_result = yield ctx.call_activity(aggregate_results, input=results)
return final_result
Use task.when_all() to wait for all parallel tasks to complete. The Durable Task SDK ensures that the tasks can run on multiple machines concurrently, and the execution is resilient to process restarts.
This sample is available for .NET, JavaScript, Java, and Python.
import com.microsoft.durabletask.*;
import java.util.List;
import java.util.stream.Collectors;
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
// Fan-out: Create a task for each input item
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
// Wait for all parallel tasks to complete
List<Integer> allResults = ctx.allOf(tasks).await();
// Fan-in: Aggregate results
int totalCount = allResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalCount);
};
}
})
.build();
Use ctx.allOf(tasks).await() to wait for all parallel tasks to complete. The Durable Task SDK ensures that the tasks can run on multiple machines concurrently, and the execution is resilient to process restarts.
Activities
The helper activity functions are regular functions that use the activityTrigger binding.
E2_GetFileList activity function
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
V3 programming model
The function.json file for E2_GetFileList looks like the following example:
{
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
Here's the implementation:
const readdirp = require("readdirp");
module.exports = function (context, rootDirectory) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
readdirp(
{ root: rootDirectory, entryType: "all" },
function (fileInfo) {
if (!fileInfo.stat.isDirectory()) {
allFilePaths.push(fileInfo.fullPath);
}
},
function (err, res) {
if (err) {
throw err;
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
context.done(null, allFilePaths);
}
);
};
The function uses the readdirp module, version 2.x, to recursively read the directory structure.
V4 programming model
Here's the implementation of the getFileList activity function:
const df = require("durable-functions");
const readdirp = require("readdirp");
const getFileListActivityName = "getFileList";
df.app.activity(getFileListActivityName, {
handler: async function (rootDirectory, context) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
for await (const entry of readdirp(rootDirectory, { type: "files" })) {
allFilePaths.push(entry.fullPath);
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
return allFilePaths;
},
});
The function uses the readdirp module (version 3.x) to recursively read the directory structure.
The function.json file for E2_GetFileList looks like the following example:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
Here's the implementation:
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
PowerShell sample coming soon.
Note
Don't put this code in the orchestrator function. Orchestrator functions shouldn't do I/O, including local file system access. For more information, see Orchestrator function code constraints.
E2_CopyFileToBlob activity function
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Note
To run the sample code, install the Microsoft.Azure.WebJobs.Extensions.Storage NuGet package.
The function uses Azure Functions binding features like the Binder parameter. You don't need those details for this walkthrough.
V3 programming model
The function.json file for E2_CopyFileToBlob is similarly simple:
{
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
},
{
"name": "out",
"type": "blob",
"path": "",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
],
"disabled": false
}
The JavaScript implementation uses the Azure Storage SDK for Node to upload the files to Azure Blob Storage.
const fs = require("fs");
const path = require("path");
const storage = require("azure-storage");
module.exports = function (context, filePath) {
const container = "backups";
const root = path.parse(filePath).root;
const blobPath = filePath.substring(root.length).replace("\\", "/");
const outputLocation = `backups/${blobPath}`;
const blobService = storage.createBlobService();
blobService.createContainerIfNotExists(container, (error) => {
if (error) {
throw error;
}
fs.stat(filePath, function (error, stats) {
if (error) {
throw error;
}
context.log(
`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`
);
const readStream = fs.createReadStream(filePath);
blobService.createBlockBlobFromStream(
container,
blobPath,
readStream,
stats.size,
function (error) {
if (error) {
throw error;
}
context.done(null, stats.size);
}
);
});
});
};
V4 programming model
The JavaScript implementation of copyFileToBlob uses an Azure Storage output binding to upload the files to Azure Blob Storage.
const df = require("durable-functions");
const fs = require("fs/promises");
const { output } = require("@azure/functions");
const copyFileToBlobActivityName = "copyFileToBlob";
const blobOutput = output.storageBlob({
path: "backups/{backupPath}",
connection: "StorageConnString",
});
df.app.activity(copyFileToBlobActivityName, {
extraOutputs: [blobOutput],
handler: async function ({ backupPath, filePath }, context) {
const outputLocation = `backups/${backupPath}`;
const stats = await fs.stat(filePath);
context.log(`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`);
const fileContents = await fs.readFile(filePath);
context.extraOutputs.set(blobOutput, fileContents);
return stats.size;
},
});
The function.json file for E2_CopyFileToBlob is similarly simple:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
The Python implementation uses the Azure Storage SDK for Python to upload the files to Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
PowerShell sample coming soon.
The implementation loads the file from disk and asynchronously streams the contents into a blob of the same name in the backups container. The function returns the number of bytes copied to storage. The orchestrator uses that value to compute the aggregate sum.
Note
This example moves I/O operations into an activityTrigger function. The work can run across multiple machines and supports progress checkpointing. If the host process ends, you know which uploads are complete.
Activities do the work. Unlike orchestrators, activities can perform I/O operations and nondeterministic logic.
Process work item activity
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (this is where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const processWorkItem = async (
_ctx: ActivityContext,
item: string
): Promise<number> => {
console.log(`Processing work item: "${item}"`);
return item.length;
};
Unlike orchestrators, activities can perform I/O operations like HTTP calls, database queries, and file access.
from durabletask import task
def process_work_item(ctx: task.ActivityContext, item: int) -> dict:
"""Activity that processes a single work item."""
# Process the work item (this is where you do the actual work)
result = item * item
return {"item": item, "result": result}
This sample is shown for .NET, JavaScript, Java, and Python.
import java.util.StringTokenizer;
// Activity registration
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
Aggregate results activity
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const aggregateResults = async (
_ctx: ActivityContext,
results: number[]
): Promise<object> => {
const total = results.reduce((sum, val) => sum + val, 0);
return {
totalItems: results.length,
sum: total,
average: results.length > 0 ? total / results.length : 0,
};
};
Unlike orchestrators, activities can perform I/O operations like HTTP calls, database queries, and file access.
from durabletask import task
def aggregate_results(ctx: task.ActivityContext, results: list) -> dict:
"""Activity that aggregates results from multiple work items."""
sum_result = sum(item["result"] for item in results)
return {
"total_items": len(results),
"sum": sum_result,
"average": sum_result / len(results) if results else 0
}
This sample is shown for .NET, JavaScript, Java, and Python.
In the Java sample, the orchestrator aggregates results after ctx.allOf(tasks).await() returns.
Run the sample
Start the orchestration on Windows by sending the following HTTP POST request:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Alternatively, on a Linux function app, start the orchestration by sending the following HTTP POST request. Python currently runs on Linux for App Service:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Note
The HttpStart function expects JSON. Include the Content-Type: application/json header, and encode the directory path as a JSON string. The HTTP snippet assumes host.json has an entry that removes the default api/ prefix from all HTTP trigger function URLs. Find the markup for this configuration in the sample host.json file.
This HTTP request triggers the E2_BackupSiteContent orchestrator and passes the string D:\home\LogFiles as a parameter. The response has a link to check the status of the backup operation:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
Depending on the number of log files in your function app, this operation can take several minutes to finish. Get the latest status by querying the URL in the Location header of the previous HTTP 202 response:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
In this case, the function is still running. The response shows the input saved in the orchestrator state and the last updated time. Use the Location header value to poll for completion. When the status is "Completed", the response resembles the following example:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
The response shows that the orchestration is complete and the approximate time to finish. The output field indicates that the orchestration uploaded about 450 KB of logs.
To run the example:
Start the Durable Task Scheduler emulator for local development.
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
Start the worker to register the orchestrator and activities.
Run the client to schedule an orchestration with a list of work items:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
import {
DurableTaskAzureManagedClientBuilder,
} from "@microsoft/durabletask-js-azuremanaged";
const connectionString =
process.env.DURABLE_TASK_SCHEDULER_CONNECTION_STRING ||
"Endpoint=http://localhost:8080;Authentication=None;TaskHub=default";
const client = new DurableTaskAzureManagedClientBuilder()
.connectionString(connectionString)
.build();
const workItems = ["item1", "item2", "item3", "item4", "item5"];
const instanceId = await client.scheduleNewOrchestration(fanOutFanInOrchestrator, workItems);
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
console.log(`Result: ${state?.serializedOutput}`);
Create the DurableTaskAzureManagedClientBuilder by using a connection string to the Durable Task Scheduler. Use scheduleNewOrchestration to start an orchestration, and use waitForOrchestrationCompletion to wait for completion.
# Schedule the orchestration with a list of work items
work_items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
instance_id = client.schedule_new_orchestration(fan_out_fan_in_orchestrator, input=work_items)
# Wait for completion
result = client.wait_for_orchestration_completion(instance_id, timeout=60)
print(f"Result: {result.serialized_output}")
This sample is shown for .NET, JavaScript, Java, and Python.
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
// Schedule the orchestration with a list of strings
List<String> sentences = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"Always remember that you are absolutely unique.");
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(sentences));
// Wait for completion
OrchestrationMetadata result = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), true);
System.out.println("Total word count: " + result.readOutputAs(int.class));
Next steps
This sample shows the fan-out/fan-in pattern. The next sample shows how to implement the monitor pattern with durable timers.