Scenario di fan-out/fan-it in Funzioni permanenti - Esempio di backup cloud

Fan-out/fan-in fa riferimento al modello di esecuzione di più funzioni contemporaneamente e quindi di aggregazione dei risultati. Questo articolo illustra un esempio che usa Funzioni permanenti per implementare uno scenario di fan-in/fan-out. L'esempio è una funzione permanente che esegue il backup di tutto o di una parte del contenuto del sito di un'app in Archiviazione di Azure.

Nota

La versione 4 del modello di programmazione Node.js per Funzioni di Azure è disponibile a livello generale. Il nuovo modello v4 è progettato per offrire agli sviluppatori JavaScript e TypeScript un'esperienza più flessibile e intuitiva. Altre informazioni sulle differenze tra v3 e v4 sono disponibili nella guida alla migrazione.

Nei frammenti di codice seguenti JavaScript (PM4) indica il modello di programmazione V4, la nuova esperienza.

Prerequisiti

Panoramica dello scenario

In questo esempio le funzioni caricano tutti i file in modo ricorsivo in una directory specificata nell'archiviazione BLOB e contano anche il numero totale di byte caricati.

È possibile scrivere una singola funzione che esegua tutte le operazioni. Il problema principale da affrontare è costituito dalla scalabilità. Un'esecuzione di una singola funzione può essere eseguita solo in una singola macchina virtuale, pertanto la velocità effettiva sarà limitata dalla velocità effettiva di tale singola macchina virtuale. Un altro problema da affrontare è l'affidabilità. Se si verifica un errore a metà strada o se l'intero processo richiede più di 5 minuti, il backup potrebbe non riuscire in uno stato parzialmente completato. con la necessità di essere riavviato.

Un approccio più efficace consiste nello scrivere due funzioni regolari, una per enumerare i file e aggiungere i nomi di file a una coda e un'altra per leggere dalla coda e caricare i file nell'archiviazione BLOB. Questo approccio è migliore in termini di velocità effettiva e affidabilità, ma richiede il provisioning e la gestione di una coda. Aspetto ancora più importante, in questo caso viene introdotta una complessità significativa in termini di gestione dello stato e di coordinamento se si desidera eseguire altre operazioni, ad esempio indicare il numero totale di byte caricati.

Un approccio tramite Funzioni permanenti è caratterizzato da tutti i vantaggi citati con un overhead molto basso.

Funzioni

Questo articolo descrive le funzioni seguenti nell'app di esempio:

  • E2_BackupSiteContent: funzione dell'agente di orchestrazione che chiama E2_GetFileList per ottenere un elenco di file di cui eseguire il backup, quindi chiama E2_CopyFileToBlob per eseguire il backup di ogni file.
  • E2_GetFileList: funzione di attività che restituisce un elenco di file in una directory.
  • E2_CopyFileToBlob: funzione di attività che esegue il backup di un singolo file in Archiviazione BLOB di Azure.

funzione dell'agente di orchestrazione E2_BackupSiteContent

Le operazioni di questa funzione dell'agente di orchestrazione sono le seguenti:

  1. Acquisizione di un valore rootDirectory come parametro di input.
  2. Chiamata di una funzione per ottenere un elenco ricorsivo di file in rootDirectory.
  3. Esecuzione di più chiamate di funzioni in parallelo per caricare ogni file in Archiviazione BLOB di Azure.
  4. Attesa del completamento di tutti i caricamenti.
  5. Restituzione dei byte totali caricati in Archiviazione BLOB di Azure.

Il codice che implementa la funzione dell'agente di orchestrazione è il seguente:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Si noti la riga await Task.WhenAll(tasks);. Non sono state attese tutte le singole chiamate alla E2_CopyFileToBlob funzione, che consente l'esecuzione in parallelo. Quando si passa questa matrice di attività a Task.WhenAll, viene restituita un'attività che non verrà completata fino al completamento di tutte le operazioni di copia. Se si ha familiarità con Task Parallel Library (TPL) in .NET, questo scenario non è una novità. La differenza è che queste attività potrebbero essere in esecuzione in più macchine virtuali contemporaneamente e l'estensione Durable Functions garantisce che l'esecuzione end-to-end sia resiliente al riciclo dei processi.

Dopo l'attesa da Task.WhenAll, tutte le chiamate di funzione sono state completate e hanno restituito valori. Ogni chiamata a E2_CopyFileToBlob restituisce il numero di byte caricato e di conseguenza per calcolare il numero di byte totale è sufficiente sommare tutti i valori restituiti.

Funzioni di attività helper

Le funzioni di attività helper, in modo analogo agli altri esempi, sono normali funzioni che usano l'associazione di trigger activityTrigger.

funzione di attività E2_GetFileList

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Nota

È lecito chiedersi perché non è sufficiente inserire questo codice direttamente nella funzione dell'agente di orchestrazione. Sebbene possibile, questa operazione violerebbe una delle regole fondamentali delle funzioni dell'agente di orchestrazione, ovvero quella in base alla quale non è consigliabile che tali funzioni eseguano operazioni di I/O, incluso l'accesso al file system locale. Per altre informazioni, vedere Vincoli di codice della funzione Orchestrator.

funzione attività E2_CopyFileToBlob

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Nota

È necessario installare il Microsoft.Azure.WebJobs.Extensions.Storage pacchetto NuGet per eseguire il codice di esempio.

La funzione usa alcune funzionalità avanzate di associazioni Funzioni di Azure, ovvero l'uso del parametro, ma non è necessario preoccuparsi di tali dettagli per lo scopo di Binderquesta procedura dettagliata.

L'implementazione carica il file dal disco e trasmette il contenuto in modo asincrono in un BLOB con lo stesso nome nel contenitore dei backup. Il valore restituito è il numero di byte copiati nell'archiviazione, che viene quindi usato dalla funzione dell'agente di orchestrazione per calcolare la somma di aggregazione.

Nota

Questo è un esempio perfetto dello spostamento delle operazioni di I/O in una funzione activityTrigger. Non solo il lavoro può essere distribuito in molti computer diversi, ma si ottengono anche i vantaggi del checkpoint dello stato di avanzamento. Se il processo host viene interrotto per qualsiasi motivo, è possibile conoscere quali caricamenti sono già stati completati.

Eseguire l'esempio

È possibile avviare l'orchestrazione, in Windows, inviando la richiesta HTTP POST seguente.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

In alternativa, in un'app per le funzioni Linux (Python è attualmente in esecuzione solo in Linux per servizio app), è possibile avviare l'orchestrazione come segue:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Nota

La funzione HttpStart richiamata funziona solo con contenuto in formato JSON. Per questo motivo, l'intestazione Content-Type: application/json è necessaria e il percorso della directory viene codificato come una stringa JSON. Il frammento di codice HTTP precedente presuppone inoltre che sia presente una voce nel file host.json che consente di rimuovere il prefisso predefinito api/ da tutti gli URL di funzioni di trigger HTTP. È possibile trovare il markup per la configurazione nel file host.json negli esempi.

Questa richiesta HTTP attiva l'agente di orchestrazione E2_BackupSiteContent e la stringa D:\home\LogFiles viene passata come parametro. La risposta restituisce un collegamento per ottenere lo stato dell'operazione di backup:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

A seconda del numero di file di log presenti nell'app per le funzioni, questa operazione potrebbe richiedere alcuni minuti. È possibile ottenere lo stato più recente eseguendo una query sull'URL nell'intestazione Location della risposta HTTP 202 precedente.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

In questo caso la funzione è ancora in esecuzione. È ora possibile visualizzare l'input che è stato salvato nello stato dell'agente di orchestrazione e l'ora dell'ultimo aggiornamento. È possibile continuare a usare i valori dell'intestazione Location per eseguire il polling per il completamento. Quando lo stato è completato, viene visualizzato un valore di risposta HTTP simile al seguente:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Ora è possibile visualizzare che l'orchestrazione è stata completata e approssimativamente il tempo necessario per il completamento. Viene inoltre visualizzato un valore per il campo output, che indica che sono stati caricati circa 450 KB di log.

Passaggi successivi

In questo esempio è stato illustrato come implementare il criterio di fan-out/fan-in. L'esempio successivo illustra come implementare il modello di monitoraggio usando i timer permanenti.