Compartir a través de


Escenario de distribución ramificada de entrada y salida

La distribución ramificada de entrada y salida ejecuta varias actividades en paralelo y, a continuación, agrega los resultados. En este artículo se muestra cómo implementar el patrón mediante los SDK de Durable Task para .NET, JavaScript, Python y Java.

Información general del escenario

En este ejemplo, las funciones cargan todos los archivos en un directorio especificado (recursivamente) en Blob Storage. También cuentan el número total de bytes cargados.

Una sola función puede controlar todo, pero no es escalable. Una ejecución de una sola función se ejecuta en una máquina virtual (VM), por lo que el rendimiento se limita a esa máquina virtual. La confiabilidad es otra preocupación. Si se produce un error en el proceso a mitad del proceso o tarda más de cinco minutos, la copia de seguridad puede finalizar en un estado completado parcialmente. A continuación, reinicie la copia de seguridad.

Un enfoque más sólido consiste en usar dos funciones independientes: una enumera los archivos y agrega nombres de archivo a una cola, y la otra lee de la cola y carga los archivos en Blob Storage. Este enfoque mejora el rendimiento y la confiabilidad, pero debe configurar y administrar la cola. Lo más importante es que este enfoque agregue complejidad para la administración y coordinación de estados, como notificar el número total de bytes cargados.

Durable Functions proporciona todas estas ventajas con poca sobrecarga.

En el ejemplo siguiente, el orquestador procesa varios elementos de trabajo en paralelo y, a continuación, agrega los resultados. Este patrón es útil cuando es necesario:

  • Procesar un lote de elementos en los que cada elemento se puede procesar de forma independiente
  • Distribuir el trabajo entre varias máquinas para mejorar el rendimiento
  • Agregar los resultados de todas las operaciones paralelas

Sin el patrón de distribución ramificada de entrada y salida, los elementos se procesan secuencialmente, lo que limita el rendimiento, o se gestiona la propia lógica de colas y coordinación, lo que agrega complejidad.

Los SDK de Durable Task controlan la paralelización y la coordinación, por lo que el patrón es sencillo de implementar.

Funciones

En este artículo se describen las funciones de la aplicación de ejemplo:

  • E2_BackupSiteContent: una función de orquestador que llama E2_GetFileList a para obtener una lista de archivos de los que se va a realizar una copia de seguridad y, a continuación, llama E2_CopyFileToBlob a para cada archivo.
  • E2_GetFileList: función de actividad que devuelve una lista de archivos de un directorio.
  • E2_CopyFileToBlob: función de actividad que realiza una copia de seguridad de un único archivo en Azure Blob Storage.

En este artículo se describen los componentes del código de ejemplo:

  • ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator o FanOutFanIn_WordCount: orquestador que efectúa una distribución ramificada de salida del trabajo a varias actividades en paralelo, espera a que finalicen todas las actividades y, a continuación, efectúa una distribución ramificada de entrada agregando los resultados.
  • ProcessWorkItemActivity, processWorkItem, process_work_itemo CountWords: actividad que procesa un único elemento de trabajo.
  • AggregateResultsActivity, aggregateResultso aggregate_results: actividad que agrega resultados de todas las operaciones paralelas.

Orchestrator

Esta función de orquestador hace lo siguiente:

  1. Toma rootDirectory como entrada.
  2. Llama a una función para obtener una lista recursiva de los archivos de rootDirectory.
  3. Realiza llamadas de función paralelas para cargar cada archivo en Azure Blob Storage.
  4. Espera a que finalicen todas las cargas.
  5. Devuelve el número total de bytes cargados en Azure Blob Storage.

Este es el código que implementa la función de orquestador:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Observe la línea await Task.WhenAll(tasks);. El código no espera las llamadas individuales a E2_CopyFileToBlob, por lo que se ejecutan en paralelo. Cuando el orquestador pasa la matriz de tareas a Task.WhenAll, devuelve una tarea que no se completa hasta que se completen todas las operaciones de copia. Si está familiarizado con la biblioteca paralela de tareas (TPL) en .NET, este patrón es familiar. La diferencia es que estas tareas podrían ejecutarse simultáneamente en varias máquinas virtuales y la extensión Durable Functions garantiza que la ejecución de un extremo a otro sea resistente al reciclaje de procesos.

Una vez que el orquestador espera Task.WhenAll, todas las llamadas a la función se completan y devuelven valores. Cada llamada a E2_CopyFileToBlob devuelve el número de bytes cargados. Calcule el total agregando los valores devueltos.

El orquestador hace lo siguiente:

  1. Toma una lista de elementos de trabajo como entrada.
  2. Efectúa una distribución ramificada de salida creando una tarea para cada elemento de trabajo y procesándola en paralelo.
  3. Espera a que se completen todas las tareas paralelas.
  4. Efectúa una distribución ramificada de entrada agregando los resultados.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

Use Task.WhenAll() para esperar a que se completen todas las tareas paralelas. El SDK de Durable Task garantiza que las tareas se pueden ejecutar en varias máquinas simultáneamente y que la ejecución es resistente a los reinicios del proceso.

Activities

Las funciones auxiliares de actividad son funciones normales que utilizan la vinculación activityTrigger.

Función de actividad E2_GetFileList

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Nota:

No coloque este código en la función de orquestador. Las funciones de orquestador no deben realizar E/S, incluido el acceso al sistema de archivos local. Para más información, consulte el Restricciones de código de las funciones de orquestador.

Función de actividad de E2_CopyFileToBlob

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Nota:

Para ejecutar el código de ejemplo, instale el paquete NuGet Microsoft.Azure.WebJobs.Extensions.Storage.

La función usa características de enlace de Azure Functions como el parámetro /Binder. No necesita esos detalles para este tutorial.

La implementación carga el archivo desde el disco y transmite de forma asincrónica el contenido a un blob con el mismo nombre en el backups contenedor. La función devuelve el número de bytes copiados en el almacenamiento. El orquestador usa ese valor para calcular la suma agregada.

Nota:

En este ejemplo, las operaciones de E/S se mueven a una función activityTrigger. El trabajo se puede ejecutar en varias máquinas y admite puntos de control de progreso. Si finaliza el proceso del host, sabrá qué cargas se han completado.

Las actividades realizan el trabajo. A diferencia de los orquestadores, las actividades pueden realizar operaciones de E/S y lógica no determinista.

Actividad del elemento de trabajo en el proceso

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (this is where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

Actividad de resultados agregados

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

Ejecución del ejemplo

Inicie la orquestación en Windows enviando la siguiente solicitud HTTP POST:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Como alternativa, en una aplicación de funciones de Linux, inicie la orquestación mediante el envío de la siguiente solicitud HTTP POST. Python se ejecuta actualmente en Linux para App Service:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Nota:

La HttpStart función espera JSON. Incluya el Content-Type: application/json encabezado y codifique la ruta de acceso del directorio como una cadena JSON. El fragmento de código HTTP asume que host.json tiene una entrada que elimina el prefijo api/ predeterminado de todas las direcciones URL de las funciones desencadenadoras HTTP. Busque el marcado de esta configuración en el archivo host.json de ejemplo.

Esta solicitud HTTP desencadena el orquestador E2_BackupSiteContent y pasa la cadena D:\home\LogFiles como un parámetro. La respuesta tiene un vínculo para comprobar el estado de la operación de copia de seguridad:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

En función del número de archivos de registro de la aplicación de funciones, esta operación puede tardar varios minutos en finalizar. Obtenga el estado más reciente consultando la dirección URL en el Location encabezado de la respuesta HTTP 202 anterior:

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

En este caso, la función todavía se está ejecutando. La respuesta muestra la entrada guardada en el estado del orquestador y la hora de la última actualización. Utilice el valor del encabezado Location para sondear la finalización. Cuando el estado es "Completed", la respuesta es similar al ejemplo siguiente:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

La respuesta muestra que la orquestación está completa y el tiempo aproximado para finalizar. El output campo indica que la orquestación cargó aproximadamente 450 KB de registros.

Para ejecutar el ejemplo:

  1. Inicie el emulador del Planificador de Tareas Duraderas para el desarrollo local.

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Inicie el proceso de trabajo para registrar el orquestador y las actividades.

  3. Ejecute el cliente para programar una orquestación con una lista de elementos de trabajo:

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

Pasos siguientes

En este ejemplo se muestra el patrón de distribución ramificada de entrada y salida. En el ejemplo siguiente se muestra cómo implementar el patrón de supervisión con temporizadores duraderos.

En este artículo se muestra el patrón fan-out/fan-in. Explore más patrones y características:

Para ver ejemplos del SDK de JavaScript, consulte ejemplos del SDK de JavaScript de Durable Task.