Partilhar via


Cenário fan-out/fan-in no Durable Functions – Exemplo de cópia de segurança na cloud

Fan-out/fan-in refere-se ao padrão de execução simultânea de múltiplas funções e, em seguida, à realização de alguma agregação nos resultados. Este artigo explica um exemplo que utiliza Durable Functions para implementar um cenário de fan-in/fan-out. O exemplo é uma função durável que faz uma cópia de segurança de todo ou parte do conteúdo do site de uma aplicação no Armazenamento do Azure.

Nota

A versão 4 do modelo de programação Node.js para Funções do Azure está geralmente disponível. O novo modelo v4 foi concebido para ter uma experiência mais flexível e intuitiva para programadores de JavaScript e TypeScript. Saiba mais sobre as diferenças entre v3 e v4 no guia de migração.

Nos fragmentos de código seguintes, o JavaScript (PM4) indica o modelo de programação V4, a nova experiência.

Pré-requisitos

Scenario overview (Descrição geral do cenário)

Neste exemplo, as funções carregam todos os ficheiros num diretório especificado recursivamente para o armazenamento de blobs. Também contam o número total de bytes que foram carregados.

É possível escrever uma única função que trata de tudo. O principal problema com que se depararia é a escalabilidade. Uma execução de função única só pode ser executada numa única máquina virtual, pelo que o débito será limitado pelo débito dessa VM única. Outro problema é a fiabilidade. Se ocorrer uma falha a meio ou se todo o processo demorar mais de 5 minutos, a cópia de segurança poderá falhar num estado parcialmente concluído. Em seguida, teria de ser reiniciado.

Uma abordagem mais robusta seria escrever duas funções regulares: uma enumeraria os ficheiros e adicionava os nomes de ficheiros a uma fila e outra leria a partir da fila e carregaria os ficheiros para o armazenamento de blobs. Esta abordagem é melhor em termos de débito e fiabilidade, mas requer que aprovisione e faça a gestão de uma fila. Mais importante ainda, a complexidade significativa é introduzida em termos de gestão e coordenação do estado se quiser fazer mais alguma coisa, como reportar o número total de bytes carregados.

Uma abordagem Durable Functions dá-lhe todos os benefícios mencionados com uma sobrecarga muito baixa.

As funções

Este artigo explica as seguintes funções na aplicação de exemplo:

  • E2_BackupSiteContent: uma função do orquestrador que chama E2_GetFileList para obter uma lista de ficheiros para fazer uma cópia de segurança e, em seguida, chama E2_CopyFileToBlob para fazer uma cópia de segurança de cada ficheiro.
  • E2_GetFileList: uma função de atividade que devolve uma lista de ficheiros num diretório.
  • E2_CopyFileToBlob: uma função de atividade que cria uma cópia de segurança de um único ficheiro para Armazenamento de Blobs do Azure.

E2_BackupSiteContent função orchestrator

Esta função de orquestrador faz essencialmente o seguinte:

  1. Assume um rootDirectory valor como um parâmetro de entrada.
  2. Chama uma função para obter uma lista recursiva de ficheiros em rootDirectory.
  3. Faz várias chamadas de função paralelas para carregar cada ficheiro para Armazenamento de Blobs do Azure.
  4. Aguarda a conclusão de todos os carregamentos.
  5. Devolve a soma total de bytes que foram carregados para Armazenamento de Blobs do Azure.

Eis o código que implementa a função orchestrator:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Repare na await Task.WhenAll(tasks); linha. Todas as chamadas individuais para a E2_CopyFileToBlob função não eram aguardadas, o que lhes permite executar em paralelo. Quando transmitimos esta matriz de tarefas para Task.WhenAll, recuperamos uma tarefa que só será concluída depois de concluídas todas as operações de cópia. Se estiver familiarizado com a Biblioteca Paralela de Tarefas (TPL) no .NET, isto não é novidade para si. A diferença é que estas tarefas podem estar em execução em várias máquinas virtuais em simultâneo e a extensão de Durable Functions garante que a execução ponto a ponto é resiliente à reciclagem do processo.

Depois de aguardar a partir de Task.WhenAll, sabemos que todas as chamadas de função foram concluídas e devolveram-nos valores. Cada chamada para E2_CopyFileToBlob devolve o número de bytes carregados, pelo que calcular a soma total da contagem de bytes é uma questão de adicionar todos esses valores devolvidos em conjunto.

Funções de atividade de programa auxiliar

As funções de atividade auxiliar, tal como com outras amostras, são apenas funções regulares que utilizam o enlace do acionador activityTrigger .

E2_GetFileList função de atividade

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Nota

Pode estar a perguntar-se por que motivo não pôde colocar este código diretamente na função do orquestrador. Poderia, mas isso quebraria uma das regras fundamentais das funções do orquestrador, que é que nunca devem fazer E/S, incluindo o acesso ao sistema de ficheiros local. Para obter mais informações, veja Orchestrator function code constraints (Restrições de código de função do Orchestrator).

E2_CopyFileToBlob função de atividade

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Nota

Terá de instalar o Microsoft.Azure.WebJobs.Extensions.Storage pacote NuGet para executar o código de exemplo.

A função utiliza algumas funcionalidades avançadas de enlaces de Funções do Azure (ou seja, a utilização do Binder parâmetro), mas não precisa de se preocupar com esses detalhes para efeitos destas instruções.

A implementação carrega o ficheiro a partir do disco e transmite os conteúdos de forma assíncrona para um blob com o mesmo nome no contentor "cópias de segurança". O valor devolvido é o número de bytes copiados para o armazenamento, que é depois utilizado pela função orchestrator para calcular a soma agregada.

Nota

Este é um exemplo perfeito de mover operações de E/S para uma activityTrigger função. Não só o trabalho pode ser distribuído em muitas máquinas diferentes, como também pode obter as vantagens de controlar o progresso. Se o processo de anfitrião for terminado por qualquer motivo, sabe quais os carregamentos que já foram concluídos.

Executar o exemplo

Pode iniciar a orquestração, no Windows, enviando o seguinte pedido HTTP POST.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Em alternativa, numa Aplicação de Funções do Linux (atualmente, o Python só é executado no Linux durante Serviço de Aplicações), pode iniciar a orquestração da seguinte forma:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Nota

A HttpStart função que está a invocar só funciona com conteúdo formatado em JSON. Por este motivo, o Content-Type: application/json cabeçalho é necessário e o caminho do diretório é codificado como uma cadeia JSON. Além disso, o fragmento HTTP pressupõe que existe uma entrada no host.json ficheiro que remove o prefixo predefinido api/ de todos os URLs das funções de acionador HTTP. Pode encontrar a marcação para esta configuração no host.json ficheiro nos exemplos.

Este pedido HTTP aciona o E2_BackupSiteContent orquestrador e transmite a cadeia D:\home\LogFiles como um parâmetro. A resposta fornece uma ligação para obter o estado da operação de cópia de segurança:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Dependendo do número de ficheiros de registo que tem na sua aplicação de funções, esta operação pode demorar vários minutos a ser concluída. Pode obter o estado mais recente ao consultar o URL no Location cabeçalho da resposta HTTP 202 anterior.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

Neste caso, a função ainda está em execução. Pode ver a entrada que foi guardada no estado do orquestrador e a última hora atualizada. Pode continuar a utilizar os valores de Location cabeçalho para consultar a conclusão. Quando o estado for "Concluído", verá um valor de resposta HTTP semelhante ao seguinte:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Agora pode ver que a orquestração está concluída e, aproximadamente, quanto tempo demorou a concluir. Também verá um valor para o campo, que output indica que foram carregados cerca de 450 KB de registos.

Passos seguintes

Este exemplo mostrou como implementar o padrão fan-out/fan-in. O exemplo seguinte mostra como implementar o padrão de monitorização com temporizadores duráveis.