Scenario di fan-out/fan-it in Funzioni permanenti - Esempio di backup cloud
Articolo
Fan-out/fan-in fa riferimento al modello di esecuzione di più funzioni contemporaneamente e quindi di aggregazione dei risultati. Questo articolo illustra un esempio che usa Funzioni permanenti per implementare uno scenario di fan-in/fan-out. L'esempio è una funzione permanente che esegue il backup di tutto o di una parte del contenuto del sito di un'app in Archiviazione di Azure.
Nota
La versione 4 del modello di programmazione Node.js per Funzioni di Azure è disponibile a livello generale. Il nuovo modello v4 è progettato per offrire un'esperienza più flessibile e intuitiva per gli sviluppatori JavaScript e TypeScript. Altre informazioni sulle differenze tra v3 e v4 sono disponibili nella guida alla migrazione.
Nei frammenti di codice seguenti JavaScript (PM4) indica il modello di programmazione V4, la nuova esperienza.
In questo esempio le funzioni caricano tutti i file in modo ricorsivo in una directory specificata nell'archiviazione BLOB e contano anche il numero totale di byte caricati.
È possibile scrivere una singola funzione che esegua tutte le operazioni. Il problema principale da affrontare è costituito dalla scalabilità. Una singola funzione può essere eseguita solo in un'unica macchina virtuale, pertanto la velocità effettiva sarà limitata a quella di tale VM. Un altro problema da affrontare è l'affidabilità. Se durante l'esecuzione si verifica un errore o se l'intero processo richiede più di 5 minuti, il backup potrebbe non riuscire e terminare in uno stato parzialmente completato. con la necessità di essere riavviato.
Un approccio più efficace consiste nello scrivere due funzioni regolari, una per enumerare i file e aggiungere i nomi di file a una coda e un'altra per leggere dalla coda e caricare i file nell'archiviazione BLOB. Questo approccio è migliore in termini di velocità effettiva e affidabilità, ma richiede di eseguire il provisioning e di gestire una coda. Aspetto ancora più importante, in questo caso viene introdotta una complessità significativa in termini di gestione dello stato e di coordinamento se si desidera eseguire altre operazioni, ad esempio indicare il numero totale di byte caricati.
Un approccio tramite Funzioni permanenti è caratterizzato da tutti i vantaggi citati con un overhead molto basso.
Funzioni
Questo articolo descrive le funzioni seguenti nell'app di esempio:
E2_BackupSiteContent: funzione dell'agente di orchestrazione che chiama E2_GetFileList per ottenere un elenco di file di cui eseguire il backup, quindi chiama E2_CopyFileToBlob per eseguire il backup di ogni file.
E2_GetFileList: funzione di attività che restituisce un elenco di file in una directory.
E2_CopyFileToBlob: funzione di attività che esegue il backup di un singolo file in Archiviazione BLOB di Azure.
E2_BackupSiteContent funzione dell'agente di orchestrazione
Le operazioni di questa funzione dell'agente di orchestrazione sono le seguenti:
Acquisizione di un valore rootDirectory come parametro di input.
Chiamata di una funzione per ottenere un elenco ricorsivo di file in rootDirectory.
Esecuzione di più chiamate di funzioni in parallelo per caricare ogni file in Archiviazione BLOB di Azure.
Attesa del completamento di tutti i caricamenti.
Restituzione dei byte totali caricati in Archiviazione BLOB di Azure.
Il codice che implementa la funzione dell'agente di orchestrazione è il seguente:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Si noti la riga await Task.WhenAll(tasks);. Non sono state attese tutte le singole chiamate alla funzione E2_CopyFileToBlob, che ne consente l'esecuzione in parallelo. Quando si passa questa matrice di attività a Task.WhenAll, viene restituita un'attività che non verrà completata fino al completamento di tutte le operazioni di copia. Se si ha familiarità con Task Parallel Library (TPL) in .NET, questo scenario non è una novità. La differenza è che queste attività potrebbero essere in esecuzione in più macchine virtuali contemporaneamente e l'estensione di Durable Functions assicura che l'esecuzione end-to-end sia resiliente al riciclo dei processi.
Dopo l'attesa da Task.WhenAll, tutte le chiamate di funzione sono state completate e hanno restituito valori. Ogni chiamata a E2_CopyFileToBlob restituisce il numero di byte caricato e di conseguenza per calcolare il numero di byte totale è sufficiente sommare tutti i valori restituiti.
La funzione usa il codice function.json standard per le funzioni dell'agente di orchestrazione.
Il codice che implementa la funzione dell'agente di orchestrazione è il seguente:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Si noti la riga yield context.df.Task.all(tasks);. Non sono state restituite tutte le singole chiamate alla funzioneE2_CopyFileToBlob, che ne consente l'esecuzione in parallelo. Quando si passa questa matrice di attività a context.df.Task.all, viene restituita un'attività che non verrà completata fino al completamento di tutte le operazioni di copia. Se si ha familiarità con Promise.all in JavaScript, non rappresenta una novità. La differenza è che queste attività potrebbero essere in esecuzione in più macchine virtuali contemporaneamente e l'estensione di Durable Functions assicura che l'esecuzione end-to-end sia resiliente al riciclo dei processi.
Nota
Anche se le attività sono concettualmente simili alle promesse JavaScript, le funzioni di orchestrazione dovrebbero usare context.df.Task.all e context.df.Task.any invece di Promise.all e Promise.race per gestire la parallelizzazione delle attività.
Dopo la restituzione da context.df.Task.all, tutte le chiamate di funzione sono state completate e hanno restituito valori. Ogni chiamata a E2_CopyFileToBlob restituisce il numero di byte caricato e di conseguenza per calcolare il numero di byte totale è sufficiente sommare tutti i valori restituiti.
Il codice che implementa la funzione dell'agente di orchestrazione è il seguente:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Si noti la riga yield context.df.Task.all(tasks);. Non sono state restituite tutte le singole chiamate alla funzionecopyFileToBlob, che ne consente l'esecuzione in parallelo. Quando si passa questa matrice di attività a context.df.Task.all, viene restituita un'attività che non verrà completata fino al completamento di tutte le operazioni di copia. Se si ha familiarità con Promise.all in JavaScript, non rappresenta una novità. La differenza è che queste attività potrebbero essere in esecuzione in più macchine virtuali contemporaneamente e l'estensione di Durable Functions assicura che l'esecuzione end-to-end sia resiliente al riciclo dei processi.
Nota
Anche se le attività sono concettualmente simili alle promesse JavaScript, le funzioni di orchestrazione dovrebbero usare context.df.Task.all e context.df.Task.any invece di Promise.all e Promise.race per gestire la parallelizzazione delle attività.
Dopo la restituzione da context.df.Task.all, tutte le chiamate di funzione sono state completate e hanno restituito valori. Ogni chiamata a copyFileToBlob restituisce il numero di byte caricato e di conseguenza per calcolare il numero di byte totale è sufficiente sommare tutti i valori restituiti.
La funzione usa il codice function.json standard per le funzioni dell'agente di orchestrazione.
Il codice che implementa la funzione dell'agente di orchestrazione è il seguente:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Si noti la riga yield context.task_all(tasks);. Non sono state restituite tutte le singole chiamate alla funzioneE2_CopyFileToBlob, che ne consente l'esecuzione in parallelo. Quando si passa questa matrice di attività a context.task_all, viene restituita un'attività che non verrà completata fino al completamento di tutte le operazioni di copia. Se si ha familiarità con asyncio.gather in Python, non rappresenta una novità. La differenza è che queste attività potrebbero essere in esecuzione in più macchine virtuali contemporaneamente e l'estensione di Durable Functions assicura che l'esecuzione end-to-end sia resiliente al riciclo dei processi.
Nota
Anche se le attività sono concettualmente simili agli awaitable Python, le funzioni di orchestrazione dovrebbero usare le API yield, context.task_all e context.task_any per gestire la parallelizzazione delle attività.
Dopo la restituzione da context.task_all, tutte le chiamate di funzione sono state completate e hanno restituito valori. Ogni chiamata a E2_CopyFileToBlob restituisce il numero di byte caricato, così da poter calcolare il numero di byte totale sommando tutti i valori restituiti.
Funzioni di attività helper
Le funzioni di attività helper, in modo analogo agli altri esempi, sono normali funzioni che usano l'associazione di trigger activityTrigger.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Nota
È lecito chiedersi perché non è sufficiente inserire questo codice direttamente nella funzione dell'agente di orchestrazione. Sebbene possibile, questa operazione violerebbe una delle regole fondamentali delle funzioni dell'agente di orchestrazione, ovvero quella in base alla quale non è consigliabile che tali funzioni eseguano operazioni di I/O, incluso l'accesso al file system locale. Per altre informazioni, vedere Vincoli di codice delle funzioni dell'agente di orchestrazione.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Nota
È necessario installare il Microsoft.Azure.WebJobs.Extensions.Storage pacchetto NuGet per eseguire il codice di esempio.
La funzione usa delle funzionalità avanzate di associazioni di Funzioni di Azure (ovvero l'uso del Binder parametro), senza che sia necessario preoccuparsi di tali informazioni per gli scopi di questa procedura dettagliata.
Il file function.json per E2_CopyFileToBlob è analogamente semplice:
L'implementazione JavaScript di copyFileToBlob usa un'associazione di output di Archiviazione di Azure per caricare i file nell'Archiviazione BLOB di Azure.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
L'implementazione carica il file dal disco e trasmette il contenuto in modo asincrono in un BLOB con lo stesso nome nel contenitore dei backup. Il valore restituito è il numero di byte copiati nell'archiviazione, che viene quindi usato dalla funzione dell'agente di orchestrazione per calcolare la somma di aggregazione.
Nota
Questo è un esempio perfetto dello spostamento delle operazioni di I/O in una funzione activityTrigger. In questo modo non solo il lavoro può essere distribuito tra molti computer differenti, ma è anche possibile ottenere i vantaggi determinati dall'uso di checkpoint per lo stato di avanzamento. Se il processo host viene interrotto per qualsiasi motivo, è possibile conoscere quali caricamenti sono già stati completati.
Eseguire l'esempio
È possibile avviare l'orchestrazione in Windows inviando la richiesta HTTP POST seguente.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
In alternativa, in un'app per le funzioni Linux (Python attualmente viene eseguito solo in Linux per il servizio app), è possibile avviare l'orchestrazione come segue:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Nota
La funzione HttpStart richiamata funziona solo con contenuto in formato JSON. Per questo motivo, l'intestazione Content-Type: application/json è necessaria e il percorso della directory viene codificato come una stringa JSON. Il frammento di codice HTTP precedente presuppone inoltre che sia presente una voce nel file host.json che consente di rimuovere il prefisso predefinito api/ da tutti gli URL di funzioni di trigger HTTP. È possibile trovare il markup per la configurazione nel file host.json negli esempi.
Questa richiesta HTTP attiva l'agente di orchestrazione E2_BackupSiteContent e la stringa D:\home\LogFiles viene passata come parametro. La risposta restituisce un collegamento per ottenere lo stato dell'operazione di backup:
A seconda del numero di file di log presenti nell'app per le funzioni, questa operazione potrebbe richiedere alcuni minuti. È possibile ottenere lo stato più recente eseguendo una query sull'URL nell'intestazione Location della risposta HTTP 202 precedente.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
In questo caso la funzione è ancora in esecuzione. È ora possibile visualizzare l'input che è stato salvato nello stato dell'agente di orchestrazione e l'ora dell'ultimo aggiornamento. È possibile continuare a usare i valori dell'intestazione Location per eseguire il polling per il completamento. Quando lo stato è completato, viene visualizzato un valore di risposta HTTP simile al seguente:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Ora è possibile visualizzare che l'orchestrazione è stata completata e approssimativamente il tempo necessario per il completamento. Viene inoltre visualizzato un valore per il campo output, che indica che sono stati caricati circa 450 KB di log.
Passaggi successivi
In questo esempio è stato illustrato come implementare il criterio di fan-out/fan-in. L'esempio successivo illustra come implementare il modello di monitoraggio usando i timer permanenti.