Compartilhar via


Cenário de fan-out/fan-in

O fan-out/fan-in executa várias atividades em paralelo e agrega os resultados. Este artigo mostra como implementar o padrão usando os SDKs de Tarefa Durável para .NET, JavaScript, Python e Java.

Visão geral do cenário

Neste exemplo, as funções carregam todos os arquivos em um diretório especificado (recursivamente) para o armazenamento de blobs. Eles também contam o número total de bytes carregados.

Uma única função pode tratar tudo, mas ela não é dimensionada. Uma única execução de função é executada em uma VM (máquina virtual), portanto, a taxa de transferência é limitada a essa VM. A confiabilidade é outra preocupação. Se o processo falhar no meio do caminho ou levar mais de cinco minutos, o backup poderá terminar em um estado parcialmente concluído. Em seguida, reinicie o backup.

Uma abordagem mais robusta é usar duas funções separadas: uma enumera os arquivos e adiciona nomes de arquivo a uma fila e a outra lê da fila e carrega os arquivos no armazenamento de blobs. Essa abordagem melhora a taxa de transferência e a confiabilidade, mas você precisa configurar e gerenciar a fila. Mais importante, essa abordagem adiciona complexidade ao gerenciamento e coordenação de estado, como relatar o número total de bytes carregados.

Durable Functions fornece todos esses benefícios com pouca sobrecarga.

No exemplo a seguir, o orquestrador processa vários itens de trabalho em paralelo e, em seguida, agrega os resultados. Esse padrão é útil quando você precisa:

  • Processar um lote de itens em que cada item pode ser processado independentemente
  • Distribuir o trabalho em vários computadores para obter uma melhor taxa de transferência
  • Agregar resultados de todas as operações paralelas

Sem o padrão fan-out/fan-in, você processa itens de forma sequencial, limitando a taxa de transferência, ou gerencia sua própria lógica de enfileiramento e coordenação, adicionando complexidade.

Os SDKs de Tarefa Durável lidam com paralelização e coordenação, portanto, o padrão é simples de implementar.

As funções

Este artigo descreve as funções no aplicativo de exemplo:

  • E2_BackupSiteContent: uma função de orquestrador que chama E2_GetFileList para obter uma lista de arquivos para fazer backup e, em seguida, chama E2_CopyFileToBlob para cada arquivo.
  • E2_GetFileList: uma função de atividade que retorna uma lista de arquivos em um diretório.
  • E2_CopyFileToBlob: uma função de atividade que faz backup de um único arquivo para Azure Blob Storage.

Este artigo descreve os componentes no código de exemplo:

  • ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator, ou FanOutFanIn_WordCount: um orquestrador que distribui o trabalho para várias atividades em paralelo, aguarda a conclusão de todas as atividades e então agrega os resultados.
  • ProcessWorkItemActivity, processWorkItemou process_work_itemCountWords: uma atividade que processa um único item de trabalho.
  • AggregateResultsActivity, aggregateResultsou aggregate_results: uma atividade que agrega resultados de todas as operações paralelas.

Orquestrador

Essa função de orquestrador faz o seguinte:

  1. Usa rootDirectory como entrada.
  2. Chama uma função para obter uma lista recursiva de arquivos em rootDirectory.
  3. Faz chamadas de função paralelas para carregar cada arquivo para Azure Blob Storage.
  4. Aguarda que todos os uploads sejam concluídos.
  5. Retorna o número total de bytes carregados para Azure Blob Storage.

Este é o código que implementa a função de orquestrador:

Modelo isolado
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_BackupSiteContent")]
    public static async Task<long> Run(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        string rootDirectory = context.GetInput<string>()?.Trim();
        if (string.IsNullOrEmpty(rootDirectory))
        {
            rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location)!.FullName;
        }

        string[] files = await context.CallActivityAsync<string[]>("E2_GetFileList", rootDirectory);

        Task<long>[] tasks = files
            .Select(file => context.CallActivityAsync<long>("E2_CopyFileToBlob", file))
            .ToArray();

        long[] results = await Task.WhenAll(tasks);
        return results.Sum();
    }
}

Observe a linha await Task.WhenAll(tasks);. O código não aguarda as chamadas individuais a E2_CopyFileToBlob, portanto, elas são executadas em paralelo. Quando o orquestrador passa a matriz de tarefas para Task.WhenAll, ele retorna uma tarefa que só é concluída quando todas as operações de cópia são concluídas. Se você estiver familiarizado com a TPL (Biblioteca Paralela de Tarefas) em .NET, esse padrão será familiar. A diferença é que essas tarefas podem estar em execução em várias máquinas virtuais simultaneamente e a extensão Durable Functions garante que a execução de ponta a ponta seja resiliente à reciclagem de processos.

Após o orquestrador aguardar Task.WhenAll, todas as chamadas de função são concluídas e retornam valores. Cada chamada para E2_CopyFileToBlob retorna o número de bytes enviados. Calcule o total adicionando os valores retornados.


Modelo em processo
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Observação

O exemplo de modelo em processo utiliza pacotes obsoletos em processo. O código anterior mostra o modelo de trabalho isolado do .NET recomendado.


O orquestrador faz o seguinte:

  1. Usa uma lista de itens de trabalho como entrada.
  2. Realiza fan-out, criando uma tarefa para cada item de trabalho e processando-os em paralelo.
  3. Aguarda a conclusão de todas as tarefas paralelas.
  4. Realiza fan-in, agregando os resultados.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

Use Task.WhenAll() para aguardar a conclusão de todas as tarefas paralelas. O SDK da Tarefa Durável garante que as tarefas possam ser executadas em várias máquinas simultaneamente e que a execução seja resiliente a reinicializações de processos.

Activities

As funções de atividade auxiliar são funções regulares que usam a activityTrigger vinculação.

Função de atividade de E2_GetFileList

Modelo isolado
using System.IO;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_GetFileList")]
    public static string[] GetFileList(
        [ActivityTrigger] string rootDirectory,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger("E2_GetFileList");
        logger.LogInformation("Searching for files under '{RootDirectory}'...", rootDirectory);

        string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
        logger.LogInformation("Found {FileCount} file(s) under {RootDirectory}.", files.Length, rootDirectory);

        return files;
    }
}

Modelo em processo
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Observação

Não coloque esse código na função de orquestrador. As funções de orquestrador não devem fazer E/S, incluindo o acesso ao sistema de arquivos local. Para obter mais informações, confira Restrições de código na função de orquestrador.

Função de atividade de E2_CopyFileToBlob

Modelo isolado

Observação

Para executar o código de exemplo, instale o pacote NuGet Azure.Storage.Blobs.

using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_CopyFileToBlob")]
    public static async Task<long> CopyFileToBlob(
        [ActivityTrigger] string filePath,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger("E2_CopyFileToBlob");
        long byteCount = new FileInfo(filePath).Length;

        string blobPath = filePath
            .Substring(Path.GetPathRoot(filePath)!.Length)
            .Replace('\\', '/');
        string outputLocation = $"backups/{blobPath}";

        string? connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
        if (string.IsNullOrEmpty(connectionString))
        {
            throw new InvalidOperationException("AzureWebJobsStorage is not configured.");
        }

        BlobContainerClient containerClient = new(connectionString, "backups");
        await containerClient.CreateIfNotExistsAsync();
        BlobClient blobClient = containerClient.GetBlobClient(blobPath);

        logger.LogInformation("Copying '{FilePath}' to '{OutputLocation}'. Total bytes = {ByteCount}.", filePath, outputLocation, byteCount);

        await using Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
        await blobClient.UploadAsync(source, overwrite: true);

        return byteCount;
    }
}

Modelo em processo
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Observação

O exemplo de modelo em processo requer o pacote NuGet Microsoft.Azure.WebJobs.Extensions.Storage e usa recursos de associação do Azure Functions como o Binder parâmetro.


A implementação carrega o arquivo do disco e transmite de forma assíncrona o conteúdo para um blob com o mesmo nome no backups contêiner. A função retorna o número de bytes copiados para o armazenamento. O orquestrador usa esse valor para calcular a soma agregada.

Observação

Este exemplo move as operações de E/S para uma activityTrigger função. O trabalho pode ser executado em vários computadores e dá suporte à checagem de progresso. Se o processo de host terminar, você saberá quais uploads estão concluídos.

As atividades fazem o trabalho. Ao contrário dos orquestradores, as atividades podem executar operações de E/S e lógica não determinística.

Processar atividade do item de trabalho

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (this is where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

Atividade de resultados agregados

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

Executar o exemplo

Inicie a orquestração no Windows enviando a seguinte solicitação HTTP POST:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Como alternativa, em um aplicativo de funções do Linux, inicie a orquestração enviando a seguinte solicitação HTTP POST. Python atualmente é executado no Linux para Serviço de Aplicativo:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Observação

A HttpStart função espera JSON. Inclua o Content-Type: application/json cabeçalho e codifique o caminho do diretório como uma cadeia de caracteres JSON. O extrato HTTP pressupõe que host.json tenha uma entrada que remove o prefixo padrão api/ de todas as URLs de função de gatilho HTTP. Encontre a marcação para esta configuração no arquivo host.json de exemplo.

Esta solicitação HTTP dispara o orquestrador E2_BackupSiteContent e passa a cadeia de caracteres D:\home\LogFiles como um parâmetro. A resposta tem um link para verificar o status da operação de backup:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Dependendo do número de arquivos de log em seu aplicativo de funções, essa operação pode levar vários minutos para ser concluída. Obtenha o status mais recente consultando a URL no Location cabeçalho da resposta HTTP 202 anterior:

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

Nesse caso, a função ainda está sendo executada. A resposta mostra a entrada que foi salva no estado do orquestrador e a hora da última atualização. Use o valor do cabeçalho Location para sondar a conclusão. Quando o status é "Concluído", a resposta se assemelha ao seguinte exemplo:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

A resposta mostra que a orquestração foi concluída e o tempo aproximado para concluir. O campo output indica que foram carregados cerca de 450 KB de registros pela orquestração.

Para executar o exemplo:

  1. Inicie o emulador do Agendador de Tarefas Duráveis para desenvolvimento local.

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Inicia o worker para registrar o orquestrador e as atividades.

  3. Execute o cliente para agendar uma orquestração com uma lista de itens de trabalho:

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

Próximas Etapas 

Este exemplo mostra o padrão fan-out/fan-in. O exemplo a seguir mostra como implementar o padrão de monitor com temporizadores duráveis.

Este artigo demonstra o padrão fan-out/fan-in. Explore mais padrões e recursos:

Para obter exemplos do SDK do JavaScript, consulte os exemplos do SDK do JavaScript da Tarefa Durável.