Cenário fan-out/fan-in no Durable Functions – Exemplo de cópia de segurança na cloud
Artigo
Fan-out/fan-in refere-se ao padrão de execução simultânea de múltiplas funções e, em seguida, à realização de alguma agregação nos resultados. Este artigo explica um exemplo que utiliza Durable Functions para implementar um cenário de fan-in/fan-out. O exemplo é uma função durável que faz uma cópia de segurança de todo ou parte do conteúdo do site de uma aplicação no Armazenamento do Azure.
Nota
A versão 4 do modelo de programação Node.js para Funções do Azure está geralmente disponível. O novo modelo v4 foi concebido para ter uma experiência mais flexível e intuitiva para programadores de JavaScript e TypeScript. Saiba mais sobre as diferenças entre v3 e v4 no guia de migração.
Nos fragmentos de código seguintes, o JavaScript (PM4) indica o modelo de programação V4, a nova experiência.
Neste exemplo, as funções carregam todos os ficheiros num diretório especificado recursivamente para o armazenamento de blobs. Também contam o número total de bytes que foram carregados.
É possível escrever uma única função que trata de tudo. O principal problema com que se depararia é a escalabilidade. Uma execução de função única só pode ser executada numa única máquina virtual, pelo que o débito será limitado pelo débito dessa VM única. Outro problema é a fiabilidade. Se ocorrer uma falha a meio ou se todo o processo demorar mais de 5 minutos, a cópia de segurança poderá falhar num estado parcialmente concluído. Em seguida, teria de ser reiniciado.
Uma abordagem mais robusta seria escrever duas funções regulares: uma enumeraria os ficheiros e adicionava os nomes de ficheiros a uma fila e outra leria a partir da fila e carregaria os ficheiros para o armazenamento de blobs. Esta abordagem é melhor em termos de débito e fiabilidade, mas requer que aprovisione e faça a gestão de uma fila. Mais importante ainda, a complexidade significativa é introduzida em termos de gestão e coordenação do estado se quiser fazer mais alguma coisa, como reportar o número total de bytes carregados.
Uma abordagem Durable Functions dá-lhe todos os benefícios mencionados com uma sobrecarga muito baixa.
As funções
Este artigo explica as seguintes funções na aplicação de exemplo:
E2_BackupSiteContent: uma função do orquestrador que chama E2_GetFileList para obter uma lista de ficheiros para fazer uma cópia de segurança e, em seguida, chama E2_CopyFileToBlob para fazer uma cópia de segurança de cada ficheiro.
E2_GetFileList: uma função de atividade que devolve uma lista de ficheiros num diretório.
E2_CopyFileToBlob: uma função de atividade que cria uma cópia de segurança de um único ficheiro para Armazenamento de Blobs do Azure.
E2_BackupSiteContent função orchestrator
Esta função de orquestrador faz essencialmente o seguinte:
Assume um rootDirectory valor como um parâmetro de entrada.
Chama uma função para obter uma lista recursiva de ficheiros em rootDirectory.
Faz várias chamadas de função paralelas para carregar cada ficheiro para Armazenamento de Blobs do Azure.
Aguarda a conclusão de todos os carregamentos.
Devolve a soma total de bytes que foram carregados para Armazenamento de Blobs do Azure.
Eis o código que implementa a função orchestrator:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Repare na await Task.WhenAll(tasks); linha. Todas as chamadas individuais para a E2_CopyFileToBlob função não eram aguardadas, o que lhes permite executar em paralelo. Quando transmitimos esta matriz de tarefas para Task.WhenAll, recuperamos uma tarefa que só será concluída depois de concluídas todas as operações de cópia. Se estiver familiarizado com a Biblioteca Paralela de Tarefas (TPL) no .NET, isto não é novidade para si. A diferença é que estas tarefas podem estar em execução em várias máquinas virtuais em simultâneo e a extensão de Durable Functions garante que a execução ponto a ponto é resiliente à reciclagem do processo.
Depois de aguardar a partir de Task.WhenAll, sabemos que todas as chamadas de função foram concluídas e devolveram-nos valores. Cada chamada para E2_CopyFileToBlob devolve o número de bytes carregados, pelo que calcular a soma total da contagem de bytes é uma questão de adicionar todos esses valores devolvidos em conjunto.
A função utiliza o function.json padrão para funções de orquestrador.
Eis o código que implementa a função orchestrator:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Repare na yield context.df.Task.all(tasks); linha. Não foram efetuadas todas as chamadas individuais para a E2_CopyFileToBlob função, o que lhes permite executar em paralelo. Quando transmitimos esta matriz de tarefas para context.df.Task.all, recuperamos uma tarefa que só será concluída depois de concluídas todas as operações de cópia. Se estiver familiarizado com Promise.all o JavaScript, isto não é novidade para si. A diferença é que estas tarefas podem estar em execução em várias máquinas virtuais em simultâneo e a extensão de Durable Functions garante que a execução ponto a ponto é resiliente à reciclagem do processo.
Nota
Embora as tarefas sejam conceptualmente semelhantes às promessas de JavaScript, as funções do orquestrador devem utilizar context.df.Task.all e context.df.Task.any em vez de e Promise.race gerir a paralelização de Promise.all tarefas.
Depois de ceder a , context.df.Task.allsabemos que todas as chamadas de função foram concluídas e devolveram-nos valores. Cada chamada para E2_CopyFileToBlob devolve o número de bytes carregados, pelo que calcular a soma total da contagem de bytes é uma questão de adicionar todos esses valores devolvidos em conjunto.
Eis o código que implementa a função orchestrator:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Repare na yield context.df.Task.all(tasks); linha. Não foram efetuadas todas as chamadas individuais para a copyFileToBlob função, o que lhes permite executar em paralelo. Quando transmitimos esta matriz de tarefas para context.df.Task.all, recuperamos uma tarefa que só será concluída depois de concluídas todas as operações de cópia. Se estiver familiarizado com Promise.all o JavaScript, isto não é novidade para si. A diferença é que estas tarefas podem estar em execução em várias máquinas virtuais em simultâneo e a extensão de Durable Functions garante que a execução ponto a ponto é resiliente à reciclagem do processo.
Nota
Embora as Tarefas sejam conceptualmente semelhantes às promessas de JavaScript, as funções do orquestrador devem utilizar context.df.Task.all e context.df.Task.any em vez de e Promise.race gerir a paralelização de Promise.all tarefas.
Depois de ceder a , context.df.Task.allsabemos que todas as chamadas de função foram concluídas e devolveram-nos valores. Cada chamada para copyFileToBlob devolve o número de bytes carregados, pelo que calcular a soma total da contagem de bytes é uma questão de adicionar todos esses valores devolvidos em conjunto.
A função utiliza o function.json padrão para funções de orquestrador.
Eis o código que implementa a função orchestrator:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Repare na yield context.task_all(tasks); linha. Não foram efetuadas todas as chamadas individuais para a E2_CopyFileToBlob função, o que lhes permite executar em paralelo. Quando transmitimos esta matriz de tarefas para context.task_all, recuperamos uma tarefa que só será concluída depois de concluídas todas as operações de cópia. Se estiver familiarizado com asyncio.gather o Python, isto não é novidade para si. A diferença é que estas tarefas podem estar em execução em várias máquinas virtuais em simultâneo e a extensão de Durable Functions garante que a execução ponto a ponto é resiliente à reciclagem do processo.
Nota
Embora as tarefas sejam conceptualmente semelhantes aos que o Python aguarda, as funções do orquestrador devem utilizar yield , bem como as context.task_all APIs e context.task_any para gerir a paralelização de tarefas.
Depois de ceder a , context.task_allsabemos que todas as chamadas de função foram concluídas e devolveram-nos valores. Cada chamada para E2_CopyFileToBlob devolve o número de bytes carregados, para que possamos calcular a soma total da contagem de bytes ao adicionar todos os valores devolvidos em conjunto.
Funções de atividade de programa auxiliar
As funções de atividade auxiliar, tal como com outras amostras, são apenas funções regulares que utilizam o enlace do acionador activityTrigger .
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Nota
Pode estar a perguntar-se por que motivo não pôde colocar este código diretamente na função do orquestrador. Poderia, mas isso quebraria uma das regras fundamentais das funções do orquestrador, que é que nunca devem fazer E/S, incluindo o acesso ao sistema de ficheiros local. Para obter mais informações, veja Orchestrator function code constraints (Restrições de código de função do Orchestrator).
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Nota
Terá de instalar o Microsoft.Azure.WebJobs.Extensions.Storage pacote NuGet para executar o código de exemplo.
A função utiliza algumas funcionalidades avançadas de enlaces de Funções do Azure (ou seja, a utilização do Binder parâmetro), mas não precisa de se preocupar com esses detalhes para efeitos destas instruções.
O ficheiro function.json para E2_CopyFileToBlob é igualmente simples:
A implementação do JavaScript utiliza um enlace de copyFileToBlob saída do Armazenamento do Azure para carregar os ficheiros para o armazenamento de Blobs do Azure.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
A implementação carrega o ficheiro a partir do disco e transmite os conteúdos de forma assíncrona para um blob com o mesmo nome no contentor "cópias de segurança". O valor devolvido é o número de bytes copiados para o armazenamento, que é depois utilizado pela função orchestrator para calcular a soma agregada.
Nota
Este é um exemplo perfeito de mover operações de E/S para uma activityTrigger função. Não só o trabalho pode ser distribuído em muitas máquinas diferentes, como também pode obter as vantagens de controlar o progresso. Se o processo de anfitrião for terminado por qualquer motivo, sabe quais os carregamentos que já foram concluídos.
Executar o exemplo
Pode iniciar a orquestração, no Windows, enviando o seguinte pedido HTTP POST.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Em alternativa, numa Aplicação de Funções do Linux (atualmente, o Python só é executado no Linux durante Serviço de Aplicações), pode iniciar a orquestração da seguinte forma:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Nota
A HttpStart função que está a invocar só funciona com conteúdo formatado em JSON. Por este motivo, o Content-Type: application/json cabeçalho é necessário e o caminho do diretório é codificado como uma cadeia JSON. Além disso, o fragmento HTTP pressupõe que existe uma entrada no host.json ficheiro que remove o prefixo predefinido api/ de todos os URLs das funções de acionador HTTP. Pode encontrar a marcação para esta configuração no host.json ficheiro nos exemplos.
Este pedido HTTP aciona o E2_BackupSiteContent orquestrador e transmite a cadeia D:\home\LogFiles como um parâmetro. A resposta fornece uma ligação para obter o estado da operação de cópia de segurança:
Dependendo do número de ficheiros de registo que tem na sua aplicação de funções, esta operação pode demorar vários minutos a ser concluída. Pode obter o estado mais recente ao consultar o URL no Location cabeçalho da resposta HTTP 202 anterior.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
Neste caso, a função ainda está em execução. Pode ver a entrada que foi guardada no estado do orquestrador e a última hora atualizada. Pode continuar a utilizar os valores de Location cabeçalho para consultar a conclusão. Quando o estado for "Concluído", verá um valor de resposta HTTP semelhante ao seguinte:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Agora pode ver que a orquestração está concluída e, aproximadamente, quanto tempo demorou a concluir. Também verá um valor para o campo, que output indica que foram carregados cerca de 450 KB de registos.
Passos seguintes
Este exemplo mostrou como implementar o padrão fan-out/fan-in. O exemplo seguinte mostra como implementar o padrão de monitorização com temporizadores duráveis.