Fan-out/fan-in ejecuta varias funciones en paralelo y, a continuación, agrega los resultados. En este artículo se muestra un ejemplo que usa Durable Functions para realizar copias de seguridad de parte o todo el contenido del sitio de una aplicación en Azure Storage.
Prerrequisitos
Modelo de programación V3
Modelo de programación V4
La distribución ramificada de entrada y salida ejecuta varias actividades en paralelo y, a continuación, agrega los resultados. En este artículo se muestra cómo implementar el patrón mediante los SDK de Durable Task para .NET, JavaScript, Python y Java.
Información general del escenario
En este ejemplo, las funciones cargan todos los archivos en un directorio especificado (recursivamente) en Blob Storage. También cuentan el número total de bytes cargados.
Una sola función puede controlar todo, pero no es escalable. Una ejecución de una sola función se ejecuta en una máquina virtual (VM), por lo que el rendimiento se limita a esa máquina virtual. La confiabilidad es otra preocupación. Si se produce un error en el proceso a mitad del proceso o tarda más de cinco minutos, la copia de seguridad puede finalizar en un estado completado parcialmente. A continuación, reinicie la copia de seguridad.
Un enfoque más sólido consiste en usar dos funciones independientes: una enumera los archivos y agrega nombres de archivo a una cola, y la otra lee de la cola y carga los archivos en Blob Storage. Este enfoque mejora el rendimiento y la confiabilidad, pero debe configurar y administrar la cola. Lo más importante es que este enfoque agregue complejidad para la administración y coordinación de estados, como notificar el número total de bytes cargados.
Durable Functions proporciona todas estas ventajas con poca sobrecarga.
En el ejemplo siguiente, el orquestador procesa varios elementos de trabajo en paralelo y, a continuación, agrega los resultados. Este patrón es útil cuando es necesario:
- Procesar un lote de elementos en los que cada elemento se puede procesar de forma independiente
- Distribuir el trabajo entre varias máquinas para mejorar el rendimiento
- Agregar los resultados de todas las operaciones paralelas
Sin el patrón de distribución ramificada de entrada y salida, los elementos se procesan secuencialmente, lo que limita el rendimiento, o se gestiona la propia lógica de colas y coordinación, lo que agrega complejidad.
Los SDK de Durable Task controlan la paralelización y la coordinación, por lo que el patrón es sencillo de implementar.
Funciones
En este artículo se describen las funciones de la aplicación de ejemplo:
-
E2_BackupSiteContent: una función de orquestador que llama E2_GetFileList a para obtener una lista de archivos de los que se va a realizar una copia de seguridad y, a continuación, llama E2_CopyFileToBlob a para cada archivo.
-
E2_GetFileList: función de actividad que devuelve una lista de archivos de un directorio.
-
E2_CopyFileToBlob: función de actividad que realiza una copia de seguridad de un único archivo en Azure Blob Storage.
En este artículo se describen los componentes del código de ejemplo:
-
ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator o FanOutFanIn_WordCount: orquestador que efectúa una distribución ramificada de salida del trabajo a varias actividades en paralelo, espera a que finalicen todas las actividades y, a continuación, efectúa una distribución ramificada de entrada agregando los resultados.
-
ProcessWorkItemActivity, processWorkItem, process_work_itemo CountWords: actividad que procesa un único elemento de trabajo.
-
AggregateResultsActivity, aggregateResultso aggregate_results: actividad que agrega resultados de todas las operaciones paralelas.
Orchestrator
Esta función de orquestador hace lo siguiente:
- Toma
rootDirectory como entrada.
- Llama a una función para obtener una lista recursiva de los archivos de
rootDirectory.
- Realiza llamadas de función paralelas para cargar cada archivo en Azure Blob Storage.
- Espera a que finalicen todas las cargas.
- Devuelve el número total de bytes cargados en Azure Blob Storage.
Este es el código que implementa la función de orquestador:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Observe la línea await Task.WhenAll(tasks);. El código no espera las llamadas individuales a E2_CopyFileToBlob, por lo que se ejecutan en paralelo. Cuando el orquestador pasa la matriz de tareas a Task.WhenAll, devuelve una tarea que no se completa hasta que se completen todas las operaciones de copia. Si está familiarizado con la biblioteca paralela de tareas (TPL) en .NET, este patrón es familiar. La diferencia es que estas tareas podrían ejecutarse simultáneamente en varias máquinas virtuales y la extensión Durable Functions garantiza que la ejecución de un extremo a otro sea resistente al reciclaje de procesos.
Una vez que el orquestador espera Task.WhenAll, todas las llamadas a la función se completan y devuelven valores. Cada llamada a E2_CopyFileToBlob devuelve el número de bytes cargados. Calcule el total agregando los valores devueltos.
Modelo de programación V3
La función utiliza la norma function.json para las funciones de orquestador.
{
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
],
"disabled": false
}
Este es el código que implementa la función de orquestador:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Observe la línea yield context.df.Task.all(tasks);. El código no produce las llamadas individuales a E2_CopyFileToBlob, por lo que se ejecutan en paralelo. Cuando el orquestador pasa la matriz de tareas a context.df.Task.all, devuelve una tarea que no se completa hasta que se completen todas las operaciones de copia. Si está familiarizado con Promise.all en JavaScript, no le parecerá nada nuevo. La diferencia es que estas tareas podrían ejecutarse simultáneamente en varias máquinas virtuales y la extensión Durable Functions garantiza que la ejecución de un extremo a otro sea resistente al reciclaje de procesos.
Nota:
Aunque las tareas son conceptualmente similares a las promesas de JavaScript, las funciones de orquestador deben usar context.df.Task.all y context.df.Task.any, en lugar de Promise.all y Promise.race, para administrar la paralelización de la tarea.
Una vez que el orquestador produce context.df.Task.all, todas las llamadas de función se completan y devuelven valores. Cada llamada a E2_CopyFileToBlob devuelve el número de bytes cargados, por lo que calcular el recuento total de bytes es cuestión de agregar todos los valores devueltos.
Modelo de programación V4
Este es el código que implementa la función de orquestador:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace(/\\/g, "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Observe la línea yield context.df.Task.all(tasks);.
No se generó ninguna de las llamadas individuales a la función copyFileToBlob, lo que permite que se ejecuten en paralelo. Cuando se pasa esta matriz de tareas a context.df.Task.all, obtenemos una tarea que no finalizará hasta que se completen todas las operaciones de copia. Si está familiarizado con Promise.all en JavaScript, no le parecerá nada nuevo. La diferencia es que estas tareas podrían ejecutarse simultáneamente en varias máquinas virtuales y la extensión Durable Functions garantiza que la ejecución de un extremo a otro sea resistente al reciclaje de procesos.
Nota:
Aunque las tareas son conceptualmente similares a las promesas de JavaScript, las funciones de orquestador deben usar context.df.Task.all y context.df.Task.any, en lugar de Promise.all y Promise.race, para administrar la paralelización de la tarea.
Después de la generación desde context.df.Task.all, sabemos que todas las llamadas de función han finalizado y nos han devuelto valores. Cada llamada a copyFileToBlob devuelve el número de bytes cargados, por lo que calcular el recuento total de bytes es cuestión de agregar todos los valores devueltos.
La función utiliza la norma function.json para las funciones de orquestador.
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}
Este es el código que implementa la función de orquestador:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Observe la línea yield context.task_all(tasks);. El código no produce las llamadas individuales a E2_CopyFileToBlob, por lo que se ejecutan en paralelo. Cuando el orquestador pasa la matriz de tareas a context.task_all, devuelve una tarea que no se completa hasta que se completen todas las operaciones de copia. Si está familiarizado con asyncio.gather en Python, esto no es nuevo para usted. La diferencia es que estas tareas podrían ejecutarse simultáneamente en varias máquinas virtuales y la extensión Durable Functions garantiza que la ejecución de un extremo a otro sea resistente al reciclaje de procesos.
Nota:
Aunque las tareas son conceptualmente similares a Python awaitables, las funciones de orquestador deben usar yield así como las API de context.task_all y context.task_any para administrar la paralelización de tareas.
Una vez que el orquestador produce context.task_all, todas las llamadas de función se completan y devuelven valores. Cada llamada a E2_CopyFileToBlob devuelve el número de bytes cargados, por lo que calcular el recuento total de bytes es cuestión de agregar todos los valores devueltos.
Todavía no hay disponible un ejemplo de PowerShell.
Todavía no hay disponible un ejemplo de Java.
El orquestador hace lo siguiente:
- Toma una lista de elementos de trabajo como entrada.
- Efectúa una distribución ramificada de salida creando una tarea para cada elemento de trabajo y procesándola en paralelo.
- Espera a que se completen todas las tareas paralelas.
- Efectúa una distribución ramificada de entrada agregando los resultados.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
Use Task.WhenAll() para esperar a que se completen todas las tareas paralelas. El SDK de Durable Task garantiza que las tareas se pueden ejecutar en varias máquinas simultáneamente y que la ejecución es resistente a los reinicios del proceso.
import {
OrchestrationContext,
TOrchestrator,
whenAll,
} from "@microsoft/durabletask-js";
const fanOutFanInOrchestrator: TOrchestrator = async function* (
ctx: OrchestrationContext,
workItems: string[]
): any {
// Fan-out: create a task for each work item in parallel
const tasks = workItems.map((item) => ctx.callActivity(processWorkItem, item));
// Wait for all parallel tasks to complete
const results: number[] = yield whenAll(tasks);
// Fan-in: aggregate all results
const aggregatedResult = yield ctx.callActivity(aggregateResults, results);
return aggregatedResult;
};
Use whenAll() para esperar a que se completen todas las tareas paralelas. El SDK de Durable Task garantiza que las tareas se pueden ejecutar en varias máquinas simultáneamente y que la ejecución es resistente a los reinicios del proceso.
from durabletask import task
def fan_out_fan_in_orchestrator(ctx: task.OrchestrationContext, work_items: list) -> dict:
"""Orchestrator that demonstrates fan-out/fan-in pattern."""
# Fan-out: Create a task for each work item
parallel_tasks = []
for item in work_items:
parallel_tasks.append(ctx.call_activity(process_work_item, input=item))
# Wait for all tasks to complete
results = yield task.when_all(parallel_tasks)
# Fan-in: Aggregate all the results
final_result = yield ctx.call_activity(aggregate_results, input=results)
return final_result
Use task.when_all() para esperar a que se completen todas las tareas paralelas. El SDK de Durable Task garantiza que las tareas se pueden ejecutar en varias máquinas simultáneamente y que la ejecución es resistente a los reinicios del proceso.
Este ejemplo está disponible para .NET, JavaScript, Java y Python.
import com.microsoft.durabletask.*;
import java.util.List;
import java.util.stream.Collectors;
DurableTaskGrpcWorker worker = DurableTaskSchedulerWorkerExtensions.createWorkerBuilder(connectionString)
.addOrchestration(new TaskOrchestrationFactory() {
@Override
public String getName() { return "FanOutFanIn_WordCount"; }
@Override
public TaskOrchestration create() {
return ctx -> {
List<?> inputs = ctx.getInput(List.class);
// Fan-out: Create a task for each input item
List<Task<Integer>> tasks = inputs.stream()
.map(input -> ctx.callActivity("CountWords", input.toString(), Integer.class))
.collect(Collectors.toList());
// Wait for all parallel tasks to complete
List<Integer> allResults = ctx.allOf(tasks).await();
// Fan-in: Aggregate results
int totalCount = allResults.stream().mapToInt(Integer::intValue).sum();
ctx.complete(totalCount);
};
}
})
.build();
Use ctx.allOf(tasks).await() para esperar a que se completen todas las tareas paralelas. El SDK de Durable Task garantiza que las tareas se pueden ejecutar en varias máquinas simultáneamente y que la ejecución es resistente a los reinicios del proceso.
Activities
Las funciones auxiliares de actividad son funciones normales que utilizan la vinculación activityTrigger.
Función de actividad E2_GetFileList
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
Modelo de programación V3
El archivo function.json para E2_GetFileList tiene un aspecto similar al del ejemplo siguiente:
{
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
],
"disabled": false
}
Esta es la implementación:
const readdirp = require("readdirp");
module.exports = function (context, rootDirectory) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
readdirp(
{ root: rootDirectory, entryType: "all" },
function (fileInfo) {
if (!fileInfo.stat.isDirectory()) {
allFilePaths.push(fileInfo.fullPath);
}
},
function (err, res) {
if (err) {
throw err;
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
context.done(null, allFilePaths);
}
);
};
La función usa el readdirp módulo, versión 2.x, para leer recursivamente la estructura de directorios.
Modelo de programación V4
Esta es la implementación de la getFileList función de actividad:
const df = require("durable-functions");
const readdirp = require("readdirp");
const getFileListActivityName = "getFileList";
df.app.activity(getFileListActivityName, {
handler: async function (rootDirectory, context) {
context.log(`Searching for files under '${rootDirectory}'...`);
const allFilePaths = [];
for await (const entry of readdirp(rootDirectory, { type: "files" })) {
allFilePaths.push(entry.fullPath);
}
context.log(`Found ${allFilePaths.length} under ${rootDirectory}.`);
return allFilePaths;
},
});
La función usa el módulo readdirp (versión 3.x) para leer de forma recursiva la estructura de directorios.
El archivo function.json para E2_GetFileList tiene un aspecto similar al del ejemplo siguiente:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "rootDirectory",
"type": "activityTrigger",
"direction": "in"
}
]
}
Esta es la implementación:
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
El ejemplo de PowerShell estará disponible próximamente.
Ejemplo de Java próximamente.
Función de actividad de E2_CopyFileToBlob
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Nota:
Para ejecutar el código de ejemplo, instale el paquete NuGet Microsoft.Azure.WebJobs.Extensions.Storage.
La función usa características de enlace de Azure Functions como el parámetro /Binder. No necesita esos detalles para este tutorial.
Modelo de programación V3
El archivo function.json para E2_CopyFileToBlob es igual de simple:
{
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
},
{
"name": "out",
"type": "blob",
"path": "",
"connection": "AzureWebJobsStorage",
"direction": "out"
}
],
"disabled": false
}
La implementación de JavaScript usa el SDK de Azure Storage para Node para cargar los archivos en Azure Blob Storage.
const fs = require("fs");
const path = require("path");
const storage = require("azure-storage");
module.exports = function (context, filePath) {
const container = "backups";
const root = path.parse(filePath).root;
const blobPath = filePath.substring(root.length).replace("\\", "/");
const outputLocation = `backups/${blobPath}`;
const blobService = storage.createBlobService();
blobService.createContainerIfNotExists(container, (error) => {
if (error) {
throw error;
}
fs.stat(filePath, function (error, stats) {
if (error) {
throw error;
}
context.log(
`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`
);
const readStream = fs.createReadStream(filePath);
blobService.createBlockBlobFromStream(
container,
blobPath,
readStream,
stats.size,
function (error) {
if (error) {
throw error;
}
context.done(null, stats.size);
}
);
});
});
};
Modelo de programación V4
La implementación de JavaScript de copyFileToBlob utiliza una vinculación de salida de Azure Storage para cargar los archivos en Azure Blob Storage.
const df = require("durable-functions");
const fs = require("fs/promises");
const { output } = require("@azure/functions");
const copyFileToBlobActivityName = "copyFileToBlob";
const blobOutput = output.storageBlob({
path: "backups/{backupPath}",
connection: "StorageConnString",
});
df.app.activity(copyFileToBlobActivityName, {
extraOutputs: [blobOutput],
handler: async function ({ backupPath, filePath }, context) {
const outputLocation = `backups/${backupPath}`;
const stats = await fs.stat(filePath);
context.log(`Copying '${filePath}' to '${outputLocation}'. Total bytes = ${stats.size}.`);
const fileContents = await fs.readFile(filePath);
context.extraOutputs.set(blobOutput, fileContents);
return stats.size;
},
});
El archivo function.json para E2_CopyFileToBlob es igual de simple:
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "filePath",
"type": "activityTrigger",
"direction": "in"
}
]
}
La implementación de Python usa el SDK Azure Storage para Python para cargar los archivos en Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
El ejemplo de PowerShell estará disponible próximamente.
Ejemplo de Java próximamente.
La implementación carga el archivo desde el disco y transmite de forma asincrónica el contenido a un blob con el mismo nombre en el backups contenedor. La función devuelve el número de bytes copiados en el almacenamiento. El orquestador usa ese valor para calcular la suma agregada.
Nota:
En este ejemplo, las operaciones de E/S se mueven a una función activityTrigger. El trabajo se puede ejecutar en varias máquinas y admite puntos de control de progreso. Si finaliza el proceso del host, sabrá qué cargas se han completado.
Las actividades realizan el trabajo. A diferencia de los orquestadores, las actividades pueden realizar operaciones de E/S y lógica no determinista.
Actividad del elemento de trabajo en el proceso
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (this is where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const processWorkItem = async (
_ctx: ActivityContext,
item: string
): Promise<number> => {
console.log(`Processing work item: "${item}"`);
return item.length;
};
A diferencia de los orquestadores, las actividades pueden realizar operaciones de E/S como llamadas HTTP, consultas de base de datos y acceso a archivos.
from durabletask import task
def process_work_item(ctx: task.ActivityContext, item: int) -> dict:
"""Activity that processes a single work item."""
# Process the work item (this is where you do the actual work)
result = item * item
return {"item": item, "result": result}
Este ejemplo se muestra para .NET, JavaScript, Java y Python.
import java.util.StringTokenizer;
// Activity registration
.addActivity(new TaskActivityFactory() {
@Override
public String getName() { return "CountWords"; }
@Override
public TaskActivity create() {
return ctx -> {
String input = ctx.getInput(String.class);
StringTokenizer tokenizer = new StringTokenizer(input);
return tokenizer.countTokens();
};
}
})
Actividad de resultados agregados
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
import { ActivityContext } from "@microsoft/durabletask-js";
const aggregateResults = async (
_ctx: ActivityContext,
results: number[]
): Promise<object> => {
const total = results.reduce((sum, val) => sum + val, 0);
return {
totalItems: results.length,
sum: total,
average: results.length > 0 ? total / results.length : 0,
};
};
A diferencia de los orquestadores, las actividades pueden realizar operaciones de E/S como llamadas HTTP, consultas de base de datos y acceso a archivos.
from durabletask import task
def aggregate_results(ctx: task.ActivityContext, results: list) -> dict:
"""Activity that aggregates results from multiple work items."""
sum_result = sum(item["result"] for item in results)
return {
"total_items": len(results),
"sum": sum_result,
"average": sum_result / len(results) if results else 0
}
Este ejemplo se muestra para .NET, JavaScript, Java y Python.
En el ejemplo de Java, el orquestador agrega los resultados después de que ctx.allOf(tasks).await() devuelva.
Ejecución del ejemplo
Inicie la orquestación en Windows enviando la siguiente solicitud HTTP POST:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Como alternativa, en una aplicación de funciones de Linux, inicie la orquestación mediante el envío de la siguiente solicitud HTTP POST. Python se ejecuta actualmente en Linux para App Service:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Nota:
La HttpStart función espera JSON. Incluya el Content-Type: application/json encabezado y codifique la ruta de acceso del directorio como una cadena JSON. El fragmento de código HTTP asume que host.json tiene una entrada que elimina el prefijo api/ predeterminado de todas las direcciones URL de las funciones desencadenadoras HTTP. Busque el marcado de esta configuración en el archivo host.json de ejemplo.
Esta solicitud HTTP desencadena el orquestador E2_BackupSiteContent y pasa la cadena D:\home\LogFiles como un parámetro. La respuesta tiene un vínculo para comprobar el estado de la operación de copia de seguridad:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
En función del número de archivos de registro de la aplicación de funciones, esta operación puede tardar varios minutos en finalizar. Obtenga el estado más reciente consultando la dirección URL en el Location encabezado de la respuesta HTTP 202 anterior:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
En este caso, la función todavía se está ejecutando. La respuesta muestra la entrada guardada en el estado del orquestador y la hora de la última actualización. Utilice el valor del encabezado Location para sondear la finalización. Cuando el estado es "Completed", la respuesta es similar al ejemplo siguiente:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
La respuesta muestra que la orquestación está completa y el tiempo aproximado para finalizar. El output campo indica que la orquestación cargó aproximadamente 450 KB de registros.
Para ejecutar el ejemplo:
Inicie el emulador del Planificador de Tareas Duraderas para el desarrollo local.
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
Inicie el proceso de trabajo para registrar el orquestador y las actividades.
Ejecute el cliente para programar una orquestación con una lista de elementos de trabajo:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
import {
DurableTaskAzureManagedClientBuilder,
} from "@microsoft/durabletask-js-azuremanaged";
const connectionString =
process.env.DURABLE_TASK_SCHEDULER_CONNECTION_STRING ||
"Endpoint=http://localhost:8080;Authentication=None;TaskHub=default";
const client = new DurableTaskAzureManagedClientBuilder()
.connectionString(connectionString)
.build();
const workItems = ["item1", "item2", "item3", "item4", "item5"];
const instanceId = await client.scheduleNewOrchestration(fanOutFanInOrchestrator, workItems);
const state = await client.waitForOrchestrationCompletion(instanceId, true, 30);
console.log(`Result: ${state?.serializedOutput}`);
Cree el DurableTaskAzureManagedClientBuilder mediante una cadena de conexión al Programador de Tareas Duraderas. Use scheduleNewOrchestration para iniciar una orquestación y use waitForOrchestrationCompletion para esperar la finalización.
# Schedule the orchestration with a list of work items
work_items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
instance_id = client.schedule_new_orchestration(fan_out_fan_in_orchestrator, input=work_items)
# Wait for completion
result = client.wait_for_orchestration_completion(instance_id, timeout=60)
print(f"Result: {result.serialized_output}")
Este ejemplo se muestra para .NET, JavaScript, Java y Python.
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
// Schedule the orchestration with a list of strings
List<String> sentences = Arrays.asList(
"Hello, world!",
"The quick brown fox jumps over the lazy dog.",
"Always remember that you are absolutely unique.");
String instanceId = client.scheduleNewOrchestrationInstance(
"FanOutFanIn_WordCount",
new NewOrchestrationInstanceOptions().setInput(sentences));
// Wait for completion
OrchestrationMetadata result = client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(30), true);
System.out.println("Total word count: " + result.readOutputAs(int.class));
Pasos siguientes
En este ejemplo se muestra el patrón de distribución ramificada de entrada y salida. En el ejemplo siguiente se muestra cómo implementar el patrón de supervisión con temporizadores duraderos.
En este artículo se muestra el patrón fan-out/fan-in. Explore más patrones y características:
Para ver ejemplos del SDK de JavaScript, consulte ejemplos del SDK de JavaScript de Durable Task.