Scénario fan-out/fan-in dans Fonctions durables - exemple de sauvegarde cloud
Article
Fan-out/fan-in fait référence à un modèle incluant une exécution simultanée de plusieurs fonctions puis une agrégation à partir des résultats. Cet article décrit un exemple utilisant Fonctions durables pour implémenter un scénario fan-in/fan-out. L’exemple représente une fonction durable qui sauvegarde tout ou partie du contenu du site d’une application dans Stockage Azure.
Remarque
La version 4 du modèle de programmation Node.js pour Azure Functions est en disponibilité générale. Le nouveau modèle v4 est conçu pour offrir une expérience plus flexible et intuitive pour les développeurs JavaScript et TypeScript. En savoir plus sur les différences entre v3 et v4 dans le guide de migration.
Dans les extraits de code suivants, JavaScript (PM4) désigne le modèle de programmation V4, la nouvelle expérience.
Dans cet exemple, les fonctions chargent tous les fichiers dans un répertoire spécifié de manière récursive dans le stockage d’objets blob. Elles comptent également le nombre total d’octets qui ont été chargés.
Il est possible d’écrire une fonction unique qui prend tout en charge. Le principal problème que vous risquez de rencontrer est l’évolutivité. Comme une fonction unique ne peut s’exécuter que sur une seule machine virtuelle, son débit est limité par celui de cette machine virtuelle. Un autre problème est la fiabilité. Si une défaillance survient en cours de route, ou que l’ensemble du processus prend plus de cinq minutes, la sauvegarde risque d’échouer dans un état partiellement terminé. Le processus devra alors être redémarré.
Une approche plus robuste consiste à écrire deux fonctions régulières : une pour énumérer les fichiers et ajouter les noms de fichiers à une file d’attente, et une autre pour lire à partir de la file d’attente et charger les fichiers vers le stockage d’objets blob. Cette approche est préférable du point de vue du débit et de la fiabilité, mais elle oblige à configurer et à gérer une file d’attente. Plus important encore, elle ajoute une complexité significative en termes de gestion d’état et de coordination si vous souhaitez effectuer une tâche supplémentaire, comme calculer le nombre total d’octets chargés.
Une approche Fonctions durables vous offre tous les avantages mentionnés, à peu de frais.
Les fonctions
Cet article explique les fonctions suivantes dans l’exemple d’application :
E2_BackupSiteContent: orchestrateur de fonction qui appelle E2_GetFileList pour obtenir une liste de fichiers à sauvegarder, puis appelle E2_CopyFileToBlob pour sauvegarder chaque fichier.
E2_GetFileList: fonction d’activité qui retourne une liste de fichiers dans un répertoire.
E2_CopyFileToBlob: fonction d’activité qui sauvegarde un fichier dans le stockage Blob Azure.
Fonction d’orchestrateur E2_BackupSiteContent
Cette fonction d’orchestrateur effectue essentiellement les opérations suivantes :
Prend une valeur rootDirectory comme paramètre d’entrée.
Appelle une fonction pour obtenir une liste récursive des fichiers sous rootDirectory.
Effectue plusieurs appels parallèles de fonction pour charger chaque fichier dans Stockage Blob Azure.
Attend la fin de tous les chargements.
Retourne le nombre total d’octets qui ont été chargés dans Stockage Blob Azure.
Voici le code qui implémente la fonction d’orchestrateur :
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Notez la ligne await Task.WhenAll(tasks);. Aucun des différents appels à la fonction E2_CopyFileToBlob n’a été attendu, ce qui leur permet de s’exécuter en parallèle. Lorsque nous transmettons ce tableau de tâches à Task.WhenAll, nous obtenons une tâche qui ne se termine pas tant que toutes les opérations de copie ne sont pas finies. Si vous connaissez la bibliothèque parallèle de tâches (Task Parallel Library, TPL) dans .NET, cela n’est pas nouveau pour vous. La différence est que ces tâches peuvent s’exécuter simultanément sur plusieurs machines virtuelles, et que l’extension Durable Functions garantit que l’exécution de bout en bout n’est pas interrompue par un recyclage de processus.
Après avoir attendu Task.WhenAll, nous savons que tous les appels à la fonction sont terminés et nous ont retourné des valeurs. Chaque appel à E2_CopyFileToBlob renvoie le nombre d’octets chargés. Pour calculer le nombre total d’octets, il suffit donc d’additionner toutes ces valeurs retournées.
La fonction utilise le fichier function.json standard pour les fonctions d’orchestrateur.
Voici le code qui implémente la fonction d’orchestrateur :
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Notez la ligne yield context.df.Task.all(tasks);. Aucun des différents appels à la fonction E2_CopyFileToBlob n’a été interrompu, ce qui leur permet de s’exécuter en parallèle. Lorsque nous transmettons ce tableau de tâches à context.df.Task.all, nous obtenons une tâche qui ne se termine pas tant que toutes les opérations de copie ne sont pas finies. Si vous connaissez déjà Promise.all en JavaScript, ce n’est pas une nouveauté pour vous. La différence est que ces tâches peuvent s’exécuter simultanément sur plusieurs machines virtuelles, et que l’extension Durable Functions garantit que l’exécution de bout en bout n’est pas interrompue par un recyclage de processus.
Notes
Bien que les tâches soient conceptuellement similaires aux promesses JavaScript, les fonctions d’orchestrateur doivent utiliser context.df.Task.all et context.df.Task.any plutôt que Promise.all et Promise.race pour gérer la parallélisation de la tâche.
Maintenant que nous avons interrompu l’exécution à partir de context.df.Task.all, nous savons que tous les appels à la fonction sont terminés et nous ont retourné des valeurs. Chaque appel à E2_CopyFileToBlob renvoie le nombre d’octets chargés. Pour calculer le nombre total d’octets, il suffit donc d’additionner toutes ces valeurs retournées.
Voici le code qui implémente la fonction d’orchestrateur :
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Notez la ligne yield context.df.Task.all(tasks);. Aucun des différents appels à la fonction copyFileToBlob n’a été interrompu, ce qui leur permet de s’exécuter en parallèle. Lorsque nous transmettons ce tableau de tâches à context.df.Task.all, nous obtenons une tâche qui ne se termine pas tant que toutes les opérations de copie ne sont pas finies. Si vous connaissez déjà Promise.all en JavaScript, ce n’est pas une nouveauté pour vous. La différence est que ces tâches peuvent s’exécuter simultanément sur plusieurs machines virtuelles, et que l’extension Durable Functions garantit que l’exécution de bout en bout n’est pas interrompue par un recyclage de processus.
Notes
Bien que les tâches soient conceptuellement similaires aux promesses de JavaScript, les fonctions d’orchestrateur doivent utiliser context.df.Task.all et context.df.Task.any plutôt que Promise.all et Promise.race pour gérer la parallélisation de tâche.
Maintenant que nous avons interrompu l’exécution à partir de context.df.Task.all, nous savons que tous les appels à la fonction sont terminés et nous ont retourné des valeurs. Chaque appel à copyFileToBlob renvoie le nombre d’octets chargés. Pour calculer le nombre total d’octets, il suffit donc d’additionner toutes ces valeurs retournées.
La fonction utilise le fichier function.json standard pour les fonctions d’orchestrateur.
Voici le code qui implémente la fonction d’orchestrateur :
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Notez la ligne yield context.task_all(tasks);. Aucun des différents appels à la fonction E2_CopyFileToBlob n’a été interrompu, ce qui leur permet de s’exécuter en parallèle. Lorsque nous transmettons ce tableau de tâches à context.task_all, nous obtenons une tâche qui ne se termine pas tant que toutes les opérations de copie ne sont pas finies. Si vous connaissez déjà asyncio.gather dans Python, ce n’est pas une nouveauté pour vous. La différence est que ces tâches peuvent s’exécuter simultanément sur plusieurs machines virtuelles, et que l’extension Durable Functions garantit que l’exécution de bout en bout n’est pas interrompue par un recyclage de processus.
Notes
Bien que les tâches soient conceptuellement similaires aux promesses Python, les fonctions d’orchestrateur doivent utiliser yield, ainsi que les API context.task_all et context.task_any pour gérer la parallélisation de la tâche.
Maintenant que nous avons interrompu l’exécution à partir de context.task_all, nous savons que tous les appels à la fonction sont terminés et nous ont retourné des valeurs. Chaque appel à E2_CopyFileToBlob retourne le nombre d’octets chargés, ce qui nous permet de calculer le nombre total d’octets en additionnant toutes les valeurs retournées.
Fonctions d’activité d’assistance
Les fonctions d’activité d’assistance, tout comme avec d’autres exemples, sont des fonctions standard qui utilisent la liaison de déclencheur activityTrigger.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Notes
Vous vous demandez peut-être pourquoi ne pas simplement placer ce code directement dans la fonction d’orchestrateur ? C’est possible, mais cela compromettrait une des règles fondamentales des fonctions d’orchestrateur, à savoir qu’elles ne doivent effectuer d’opérations E/S, y compris avec accès au système de fichiers local. Pour plus d’informations, consultez Contraintes du code des fonctions d’orchestrateur.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Notes
Vous devez installer le package NuGet Microsoft.Azure.WebJobs.Extensions.Storage pour exécuter l’exemple de code.
La fonction utilise certaines fonctionnalités avancées des liaisons d’Azure Functions (autrement dit, le paramètre Binder). Toutefois, vous n’avez pas à vous soucier de ces détails dans cette procédure pas à pas.
Le fichier function.json pour E2_CopyFileToBlob est tout aussi simple :
L’implémentation Python utilise le SDK Stockage Azure pour Python afin de charger les fichiers dans le Stockage Blob Azure.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
L’implémentation charge le fichier à partir du disque et transmet de manière asynchrone le contenu vers un objet blob du même nom dans le conteneur « backups ». La valeur de retour correspond au nombre d’octets copiés vers le stockage, utilisée ensuite par la fonction d’orchestrateur pour calculer la somme d’agrégation.
Notes
Il s’agit d’un exemple parfait de déplacement d’opérations d’E/S vers une fonction activityTrigger. Non seulement le travail peut être réparti sur plusieurs machines différentes, mais vous avez également la possibilité de créer des points de contrôle tout au long de la progression. Si le processus hôte est interrompu pour une raison quelconque, vous savez que les chargements ont déjà été effectués.
Exécution de l'exemple
Vous pouvez démarrer l’orchestration, sur Windows, en envoyant la requête HTTP POST suivante.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
En guise d’alternative, sur une application de fonction Linux (actuellement, Python s’exécute uniquement sur Linux pour App Service), vous pouvez démarrer l’orchestration comme suit :
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Notes
La fonction HttpStart que vous appelez fonctionne uniquement avec un contenu au format JSON. Pour cette raison, l’en-tête Content-Type: application/json est requis, et le chemin d’accès au répertoire est encodé sous forme de chaîne JSON. De plus, l’extrait de code HTTP suppose qu’il existe une entrée dans le fichier host.json, qui supprime le préfixe api/ par défaut de toutes les URL de fonctions de déclencheur HTTP. Le balisage pour cette configuration figure dans le fichier host.json dans les exemples.
Cette requête HTTP déclenche l’orchestrateur E2_BackupSiteContent et transmet la chaîne D:\home\LogFiles en tant que paramètre. La réponse fournit un lien pour obtenir l’état de l’opération de sauvegarde :
Selon le nombre de fichiers journaux que contient votre application de fonction, cette opération peut prendre plusieurs minutes. Vous pouvez obtenir l’état le plus récent en interrogeant l’URL dans l’en-tête Location de la réponse HTTP 202 précédente.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
Dans ce cas, la fonction est toujours en cours d’exécution. Vous pouvez voir l’entrée enregistrée dans l’état de l’orchestrateur et l’heure de la dernière mise à jour. Vous pouvez continuer à utiliser les valeurs d’en-tête Location jusqu’à la fin de l’interrogation. Lorsque l’état indique « Completed » (Terminé), une valeur de réponse HTTP semblable à ce qui suit apparaît :
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Vous pouvez maintenant voir que l’orchestration est terminée et la durée approximative de cette opération. Vous voyez également une valeur pour le champ output, indiquant qu’environ 450 ko de journaux d’activité ont été chargés.
Étapes suivantes
Cet exemple a montré comment implémenter le modèle fan-out/fan-in. L’exemple suivant montre comment implémenter le modèle de surveillance à l’aide de minuteurs durables.