Compartilhar via


Cenário de fan-out/fan-in nas Funções Duráveis – Exemplo de backup em nuvem

Fan-out/fan-in é o padrão de executar várias funções simultaneamente e, em seguida, executar alguma agregação dos resultados. Este artigo descreve um exemplo que usa as Funções Duráveis para implementar um cenário de fan-out/fan-in. O exemplo é uma função durável que faz o backup de todo ou de parte do conteúdo do site de um aplicativo no Armazenamento do Azure.

Observação

A versão 4 do modelo de programação Node.js para o Azure Functions está em disponibilidade geral. O novo modelo v4 é projetado para oferecer uma experiência mais flexível e intuitiva para desenvolvedores de JavaScript e TypeScript. Saiba mais sobre as diferenças entre v3 e v4 na guia de migração.

Nos trechos de código a seguir, o JavaScript (PM4) denota o modelo de programação V4, a nova experiência.

Pré-requisitos

Visão geral do cenário

Neste exemplo, as funções carregam todos os arquivos em um diretório especificado, recursivamente, no armazenamento de blobs. Elas também contam o número total de bytes que foram carregados.

É possível escrever uma única função que cuida de tudo. O principal problema que você teria seria a escalabilidade. Uma única função só pode ser executada em uma única máquina virtual, de modo que a taxa de transferência seria limitada à taxa de transferência dessa VM. Outro problema é a confiabilidade. Se houver um falha no meio do caminho ou se o processo inteiro levar mais de 5 minutos, o backup poderá falhar com um estado parcialmente concluído. Em seguida, ele precisaria ser reiniciado.

Uma abordagem mais robusta seria escrever duas funções regulares: um enumeraria os arquivos e adicionaria os nomes dos arquivo a uma fila e a outra leria da fila e carregaria os arquivos no armazenamento de blobs. Essa abordagem é melhor em termos de taxa de transferência e confiabilidade, mas requer que você provisione e gerencie uma fila. E, mais importante, uma complexidade significativa é introduzida em termos de gerenciamento de estado e coordenação se você quiser fazer qualquer outra coisa, como relatar o número total de bytes carregados.

Uma abordagem com as Funções Duráveis fornece todos os benefícios mencionados, com pouquíssima sobrecarga.

As funções

Este artigo explica as seguintes funções no aplicativo de exemplo:

  • E2_BackupSiteContent: uma função de orquestrador que chama E2_GetFileList para obter uma lista de arquivos para fazer backup e, em seguida, chama E2_CopyFileToBlob para fazer backup de cada arquivo.
  • E2_GetFileList: uma função de atividade que retorna uma lista de arquivos em um diretório.
  • E2_CopyFileToBlob: uma função de atividade que faz backup de um único arquivo para o Armazenamento de Blobs do Azure.

Função de orquestrador E2_BackupSiteContent

Essa função de orquestrador faz, essencialmente, o seguinte:

  1. Usa um valor de rootDirectory como um parâmetro de entrada.
  2. Chama uma função para obter uma lista recursiva de arquivos em rootDirectory.
  3. Faz várias chamadas de função paralelas para carregar cada arquivo no Armazenamento de Blobs do Azure.
  4. Aguarda que todos os uploads sejam concluídos.
  5. Retorna o total de bytes que foram carregados no Armazenamento de Blobs do Azure.

Este é o código que implementa a função de orquestrador:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Observe a linha await Task.WhenAll(tasks);. Todas as chamadas individuais para a E2_CopyFileToBlob função não foram aguardadas, o que permite que elas sejam executadas em paralelo. Quando passamos essa matriz de tarefas para Task.WhenAll, obtemos uma tarefa que não será concluída até que todas as operações de cópia tenham sido concluídas. Se você estiver familiarizado com a TPL (biblioteca de paralelismo de tarefas) no .NET, isso não é novidade para você. A diferença é que essas tarefas poderiam ser executadas simultaneamente em várias máquinas, virtuais e a extensão Durable Functions assegura que a execução de ponta a ponta seja resiliente em caso de reciclagem do processo.

Depois de aguardar Task.WhenAll, sabemos que todas as chamadas de função foram concluídas e retornaram valores para nós. Cada chamada para E2_CopyFileToBlob retorna o número de bytes carregados, de forma que calcular a contagem total de bytes é uma questão de adicionar todos esses valores retornados.

Funções de atividade auxiliares

Funções de atividade auxiliares, assim como ocorre com os outros exemplos, são apenas funções regulares que usam a associação de gatilho activityTrigger.

Função de atividade de E2_GetFileList

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Observação

Você deve estar se perguntando por que você não poderia simplesmente colocar esse código diretamente na função de orquestrador. Você poderia, mas isso violaria uma das regras fundamentais das funções de orquestrador, de que elas nunca devem executar E/S, incluindo no caso de acesso ao sistema de arquivos local. Para obter mais informações, confira Restrições de código na função de orquestrador.

Função de atividade de E2_CopyFileToBlob

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Observação

Você precisará instalar o Microsoft.Azure.WebJobs.Extensions.Storage pacote do NuGet para executar o código de exemplo.

A função usa alguns recursos avançados das associações do Azure Functions (ou seja, o uso do Binder parâmetro), mas você não precisa se preocupar com esses detalhes no contexto deste passo a passo.

A implementação carrega o arquivo no disco e transmite de forma assíncrona o conteúdo para um blob de mesmo nome no contêiner "backups". O valor retornado é o número de bytes copiados para o armazenamento, que é usado pela função de orquestrador para calcular a soma agregada.

Observação

Este é um exemplo perfeito de movimentação de operações de E/S para uma função activityTrigger. Não só o trabalho pode ser distribuído entre várias máquinas virtuais diferentes, mas você também obtém os benefícios de fazer verificações pontuais do progresso. Se o processo de host for encerrado por algum motivo, você saberá quais carregamentos já foram concluídos.

Execute o exemplo

Você pode iniciar a orquestração, no Windows, enviando a solicitação HTTP POST a seguir.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Como alternativa, em um aplicativo de funções do Linux (o Python atualmente só é executado no Linux para o serviço de aplicativo), você pode iniciar a orquestração da seguinte maneira:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Observação

A função HttpStart que você está invocando funciona somente com conteúdo formatado em JSON. Por esse motivo, o cabeçalho Content-Type: application/json é obrigatório e o caminho do diretório é codificado como uma cadeia de caracteres JSON. Além disso, o fragmento de HTTP assume que existe uma entrada no arquivo host.jsonque remove o prefixo padrãoapi/ de todas as URLs de funções do acionador HTTP. Você pode encontrar a marcação para essa configuração no arquivo host.json nos exemplos.

Esta solicitação HTTP dispara o orquestrador E2_BackupSiteContent e passa a cadeia de caracteres D:\home\LogFiles como um parâmetro. A resposta fornece um link para obter o status da operação de backup:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Dependendo de quantos arquivos de log você tiver em seu aplicativo de funções, essa operação pode levar vários minutos para ser concluída. Você pode obter o status mais recente consultando a URL no cabeçalho Location da resposta HTTP 202 anterior.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

Nesse caso, a função ainda está sendo executada. É possível ver a entrada que foi salva no estado do orquestrador e a hora da última atualização. Você pode continuar usando os valores de cabeçalho Location para sondar a conclusão. Quando o status for "Concluído", você verá um valor de resposta HTTP semelhante ao seguinte:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Agora, você pode ver que a orquestração foi concluída e aproximadamente quanto tempo ela levou para ser concluída. Você também verá um valor para o campo output, o que indica que cerca de 450 KB de logs foram carregados.

Próximas etapas

Este exemplo mostra como implementar o padrão de fan-out/fan-in. O próximo exemplo mostra como implementar o padrão de monitor usando temporizadores variáveis.