Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Fan-out/fan-in executa várias funções em paralelo e agrega os resultados. Este artigo mostra um exemplo que usa Durable Functions para fazer backup de algum ou todo o conteúdo do site de um aplicativo para Azure Storage.
Pré-requisitos
- Conclua o artigo de início rápido
- Clonar ou baixar o projeto de exemplos do GitHub (usa o modelo em processo)
O fan-out/fan-in executa várias atividades em paralelo e agrega os resultados. Este artigo mostra como implementar o padrão usando os SDKs de Tarefa Durável para .NET, JavaScript, Python e Java.
Visão geral do cenário
Neste exemplo, as funções carregam todos os arquivos em um diretório especificado (recursivamente) para o armazenamento de blobs. Eles também contam o número total de bytes carregados.
Uma única função pode tratar tudo, mas ela não é dimensionada. Uma única execução de função é executada em uma VM (máquina virtual), portanto, a taxa de transferência é limitada a essa VM. A confiabilidade é outra preocupação. Se o processo falhar no meio do caminho ou levar mais de cinco minutos, o backup poderá terminar em um estado parcialmente concluído. Em seguida, reinicie o backup.
Uma abordagem mais robusta é usar duas funções separadas: uma enumera os arquivos e adiciona nomes de arquivo a uma fila e a outra lê da fila e carrega os arquivos no armazenamento de blobs. Essa abordagem melhora a taxa de transferência e a confiabilidade, mas você precisa configurar e gerenciar a fila. Mais importante, essa abordagem adiciona complexidade ao gerenciamento e coordenação de estado, como relatar o número total de bytes carregados.
Durable Functions fornece todos esses benefícios com pouca sobrecarga.
No exemplo a seguir, o orquestrador processa vários itens de trabalho em paralelo e, em seguida, agrega os resultados. Esse padrão é útil quando você precisa:
- Processar um lote de itens em que cada item pode ser processado independentemente
- Distribuir o trabalho em vários computadores para obter uma melhor taxa de transferência
- Agregar resultados de todas as operações paralelas
Sem o padrão fan-out/fan-in, você processa itens de forma sequencial, limitando a taxa de transferência, ou gerencia sua própria lógica de enfileiramento e coordenação, adicionando complexidade.
Os SDKs de Tarefa Durável lidam com paralelização e coordenação, portanto, o padrão é simples de implementar.
As funções
Este artigo descreve as funções no aplicativo de exemplo:
-
E2_BackupSiteContent: uma função de orquestrador que chamaE2_GetFileListpara obter uma lista de arquivos para fazer backup e, em seguida, chamaE2_CopyFileToBlobpara cada arquivo. -
E2_GetFileList: uma função de atividade que retorna uma lista de arquivos em um diretório. -
E2_CopyFileToBlob: uma função de atividade que faz backup de um único arquivo para Azure Blob Storage.
Este artigo descreve os componentes no código de exemplo:
-
ParallelProcessingOrchestration,fanOutFanInOrchestrator,fan_out_fan_in_orchestrator, ouFanOutFanIn_WordCount: um orquestrador que distribui o trabalho para várias atividades em paralelo, aguarda a conclusão de todas as atividades e então agrega os resultados. -
ProcessWorkItemActivity,processWorkItemouprocess_work_itemCountWords: uma atividade que processa um único item de trabalho. -
AggregateResultsActivity,aggregateResultsouaggregate_results: uma atividade que agrega resultados de todas as operações paralelas.
Orquestrador
Essa função de orquestrador faz o seguinte:
- Usa
rootDirectorycomo entrada. - Chama uma função para obter uma lista recursiva de arquivos em
rootDirectory. - Faz chamadas de função paralelas para carregar cada arquivo para Azure Blob Storage.
- Aguarda que todos os uploads sejam concluídos.
- Retorna o número total de bytes carregados para Azure Blob Storage.
Este é o código que implementa a função de orquestrador:
Modelo isolado
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
string rootDirectory = context.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location)!.FullName;
}
string[] files = await context.CallActivityAsync<string[]>("E2_GetFileList", rootDirectory);
Task<long>[] tasks = files
.Select(file => context.CallActivityAsync<long>("E2_CopyFileToBlob", file))
.ToArray();
long[] results = await Task.WhenAll(tasks);
return results.Sum();
}
}
Observe a linha await Task.WhenAll(tasks);. O código não aguarda as chamadas individuais a E2_CopyFileToBlob, portanto, elas são executadas em paralelo. Quando o orquestrador passa a matriz de tarefas para Task.WhenAll, ele retorna uma tarefa que só é concluída quando todas as operações de cópia são concluídas. Se você estiver familiarizado com a TPL (Biblioteca Paralela de Tarefas) em .NET, esse padrão será familiar. A diferença é que essas tarefas podem estar em execução em várias máquinas virtuais simultaneamente e a extensão Durable Functions garante que a execução de ponta a ponta seja resiliente à reciclagem de processos.
Após o orquestrador aguardar Task.WhenAll, todas as chamadas de função são concluídas e retornam valores. Cada chamada para E2_CopyFileToBlob retorna o número de bytes enviados. Calcule o total adicionando os valores retornados.
Modelo em processo
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Observação
O exemplo de modelo em processo utiliza pacotes obsoletos em processo. O código anterior mostra o modelo de trabalho isolado do .NET recomendado.
O orquestrador faz o seguinte:
- Usa uma lista de itens de trabalho como entrada.
- Realiza fan-out, criando uma tarefa para cada item de trabalho e processando-os em paralelo.
- Aguarda a conclusão de todas as tarefas paralelas.
- Realiza fan-in, agregando os resultados.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
public override async Task<Dictionary<string, int>> RunAsync(
TaskOrchestrationContext context, List<string> workItems)
{
// Step 1: Fan-out by creating a task for each work item in parallel
var processingTasks = new List<Task<Dictionary<string, int>>>();
foreach (string workItem in workItems)
{
// Create a task for each work item (fan-out)
Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
nameof(ProcessWorkItemActivity), workItem);
processingTasks.Add(task);
}
// Step 2: Wait for all parallel tasks to complete
Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);
// Step 3: Fan-in by aggregating all results
Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
nameof(AggregateResultsActivity), results);
return aggregatedResults;
}
}
Use Task.WhenAll() para aguardar a conclusão de todas as tarefas paralelas. O SDK da Tarefa Durável garante que as tarefas possam ser executadas em várias máquinas simultaneamente e que a execução seja resiliente a reinicializações de processos.
Activities
As funções de atividade auxiliar são funções regulares que usam a activityTrigger vinculação.
Função de atividade de E2_GetFileList
Modelo isolado
using System.IO;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("E2_GetFileList");
logger.LogInformation("Searching for files under '{RootDirectory}'...", rootDirectory);
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
logger.LogInformation("Found {FileCount} file(s) under {RootDirectory}.", files.Length, rootDirectory);
return files;
}
}
Modelo em processo
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
[ActivityTrigger] string rootDirectory,
ILogger log)
{
log.LogInformation($"Searching for files under '{rootDirectory}'...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
Observação
Não coloque esse código na função de orquestrador. As funções de orquestrador não devem fazer E/S, incluindo o acesso ao sistema de arquivos local. Para obter mais informações, confira Restrições de código na função de orquestrador.
Função de atividade de E2_CopyFileToBlob
Modelo isolado
Observação
Para executar o código de exemplo, instale o pacote NuGet Azure.Storage.Blobs.
using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp;
public static class BackupSiteContent
{
[Function("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
FunctionContext executionContext)
{
ILogger logger = executionContext.GetLogger("E2_CopyFileToBlob");
long byteCount = new FileInfo(filePath).Length;
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath)!.Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
string? connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
if (string.IsNullOrEmpty(connectionString))
{
throw new InvalidOperationException("AzureWebJobsStorage is not configured.");
}
BlobContainerClient containerClient = new(connectionString, "backups");
await containerClient.CreateIfNotExistsAsync();
BlobClient blobClient = containerClient.GetBlobClient(blobPath);
logger.LogInformation("Copying '{FilePath}' to '{OutputLocation}'. Total bytes = {ByteCount}.", filePath, outputLocation, byteCount);
await using Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
await blobClient.UploadAsync(source, overwrite: true);
return byteCount;
}
}
Modelo em processo
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Observação
O exemplo de modelo em processo requer o pacote NuGet Microsoft.Azure.WebJobs.Extensions.Storage e usa recursos de associação do Azure Functions como o Binder parâmetro.
A implementação carrega o arquivo do disco e transmite de forma assíncrona o conteúdo para um blob com o mesmo nome no backups contêiner. A função retorna o número de bytes copiados para o armazenamento. O orquestrador usa esse valor para calcular a soma agregada.
Observação
Este exemplo move as operações de E/S para uma activityTrigger função. O trabalho pode ser executado em vários computadores e dá suporte à checagem de progresso. Se o processo de host terminar, você saberá quais uploads estão concluídos.
As atividades fazem o trabalho. Ao contrário dos orquestradores, as atividades podem executar operações de E/S e lógica não determinística.
Processar atividade do item de trabalho
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
private readonly ILogger<ProcessWorkItemActivity> _logger;
public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
{
_logger.LogInformation("Processing work item: {WorkItem}", workItem);
// Process the work item (this is where you do the actual work)
var result = new Dictionary<string, int>
{
{ workItem, workItem.Length }
};
return Task.FromResult(result);
}
}
Atividade de resultados agregados
using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;
[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
private readonly ILogger<AggregateResultsActivity> _logger;
public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
{
_logger = logger;
}
public override Task<Dictionary<string, int>> RunAsync(
TaskActivityContext context, Dictionary<string, int>[] results)
{
_logger.LogInformation("Aggregating {Count} results", results.Length);
// Combine all results into one aggregated result
var aggregatedResult = new Dictionary<string, int>();
foreach (var result in results)
{
foreach (var kvp in result)
{
aggregatedResult[kvp.Key] = kvp.Value;
}
}
return Task.FromResult(aggregatedResult);
}
}
Executar o exemplo
Inicie a orquestração no Windows enviando a seguinte solicitação HTTP POST:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Como alternativa, em um aplicativo de funções do Linux, inicie a orquestração enviando a seguinte solicitação HTTP POST. Python atualmente é executado no Linux para Serviço de Aplicativo:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Observação
A HttpStart função espera JSON. Inclua o Content-Type: application/json cabeçalho e codifique o caminho do diretório como uma cadeia de caracteres JSON. O extrato HTTP pressupõe que host.json tenha uma entrada que remove o prefixo padrão api/ de todas as URLs de função de gatilho HTTP. Encontre a marcação para esta configuração no arquivo host.json de exemplo.
Esta solicitação HTTP dispara o orquestrador E2_BackupSiteContent e passa a cadeia de caracteres D:\home\LogFiles como um parâmetro. A resposta tem um link para verificar o status da operação de backup:
HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
(...trimmed...)
Dependendo do número de arquivos de log em seu aplicativo de funções, essa operação pode levar vários minutos para ser concluída. Obtenha o status mais recente consultando a URL no Location cabeçalho da resposta HTTP 202 anterior:
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}
Nesse caso, a função ainda está sendo executada. A resposta mostra a entrada que foi salva no estado do orquestrador e a hora da última atualização. Use o valor do cabeçalho Location para sondar a conclusão. Quando o status é "Concluído", a resposta se assemelha ao seguinte exemplo:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
A resposta mostra que a orquestração foi concluída e o tempo aproximado para concluir. O campo output indica que foram carregados cerca de 450 KB de registros pela orquestração.
Para executar o exemplo:
Inicie o emulador do Agendador de Tarefas Duráveis para desenvolvimento local.
docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latestInicia o worker para registrar o orquestrador e as atividades.
Execute o cliente para agendar uma orquestração com uma lista de itens de trabalho:
// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(ParallelProcessingOrchestration), workItems);
// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");
Próximas Etapas
Este exemplo mostra o padrão fan-out/fan-in. O exemplo a seguir mostra como implementar o padrão de monitor com temporizadores duráveis.
Este artigo demonstra o padrão fan-out/fan-in. Explore mais padrões e recursos:
Para obter exemplos do SDK do JavaScript, consulte os exemplos do SDK do JavaScript da Tarefa Durável.