Fan-out/fan-in scenario in Durable Functions - Voorbeeld van cloudback-up
Artikel
Fan-out/fan-in verwijst naar het patroon van het gelijktijdig uitvoeren van meerdere functies en voert vervolgens een aggregatie uit op de resultaten. In dit artikel wordt een voorbeeld uitgelegd waarin Durable Functions wordt gebruikt om een fan-in-/fan-outscenario te implementeren. Het voorbeeld is een duurzame functie waarmee een back-up wordt gemaakt van alle of sommige site-inhoud van een app in Azure Storage.
Notitie
Versie 4 van het Node.js programmeermodel voor Azure Functions is algemeen beschikbaar. Het nieuwe v4-model is ontworpen voor een flexibelere en intuïtievere ervaring voor JavaScript- en TypeScript-ontwikkelaars. Meer informatie over de verschillen tussen v3 en v4 in de migratiehandleiding.
In de volgende codefragmenten geeft JavaScript (PM4) het programmeermodel V4 aan, de nieuwe ervaring.
In dit voorbeeld uploaden de functies alle bestanden onder een opgegeven map recursief naar blobopslag. Ze tellen ook het totale aantal bytes dat is geüpload.
Het is mogelijk om één functie te schrijven die voor alles zorgt. Het belangrijkste probleem dat u zou tegenkomen, is schaalbaarheid. Een uitvoering van één functie kan alleen worden uitgevoerd op één virtuele machine, zodat de doorvoer wordt beperkt door de doorvoer van die ene virtuele machine. Een ander probleem is betrouwbaarheid. Als er halverwege een fout is opgetreden of als het hele proces langer dan 5 minuten duurt, kan de back-up mislukken in een gedeeltelijk voltooide status. Deze moet vervolgens opnieuw worden opgestart.
Een krachtigere benadering is het schrijven van twee normale functies: een zou de bestanden opsommen en de bestandsnamen toevoegen aan een wachtrij, en een andere zou lezen uit de wachtrij en de bestanden uploaden naar blobopslag. Deze aanpak is beter qua doorvoer en betrouwbaarheid, maar hiervoor moet u een wachtrij inrichten en beheren. Belangrijker is dat er een aanzienlijke complexiteit wordt geïntroduceerd in termen van statusbeheer en coördinatie als u iets meer wilt doen, zoals het totale aantal geüploade bytes rapporteren.
Een Durable Functions-benadering biedt u alle genoemde voordelen met een zeer lage overhead.
De functies
In dit artikel worden de volgende functies in de voorbeeld-app uitgelegd:
E2_BackupSiteContent: Een orchestratorfunctie die aanroept E2_GetFileList om een lijst met bestanden te verkrijgen waarvan een back-up moet worden gemaakt, en roept E2_CopyFileToBlob vervolgens een back-up van elk bestand aan.
E2_GetFileList: Een activiteitsfunctie die een lijst met bestanden in een map retourneert.
E2_CopyFileToBlob: Een activiteitsfunctie die een back-up maakt van één bestand naar Azure Blob Storage.
E2_BackupSiteContent orchestratorfunctie
Deze orchestratorfunctie doet in wezen het volgende:
Neemt een rootDirectory waarde als invoerparameter.
Roept een functie aan om een recursieve lijst met bestanden onder rootDirectoryte krijgen.
Maakt meerdere parallelle functie-aanroepen om elk bestand te uploaden naar Azure Blob Storage.
Wacht totdat alle uploads zijn voltooid.
Retourneert de som van het totale aantal bytes dat is geüpload naar Azure Blob Storage.
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Let op de await Task.WhenAll(tasks); regel. Alle afzonderlijke aanroepen naar de E2_CopyFileToBlob functie zijn niet in afwachting, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze reeks taken Task.WhenAlldoorgeven, krijgen we een taak terug die niet wordt voltooid totdat alle kopieerbewerkingen zijn voltooid. Als u bekend bent met de TPL (Task Parallel Library) in .NET, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Nadat we Task.WhenAllhebben gewacht, weten we dat alle functieaanroepen zijn voltooid en waarden terug naar ons hebben geretourneerd. Elke aanroep om E2_CopyFileToBlob het aantal geüploade bytes te retourneren, dus het berekenen van het totale byteaantal is een kwestie van het samenvoegen van al deze retourwaarden.
De functie maakt gebruik van de standaard function.json voor orchestratorfuncties.
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Let op de yield context.df.Task.all(tasks); regel. Alle afzonderlijke aanroepen naar de E2_CopyFileToBlob functie zijn niet geretourneerd, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze reeks taken context.df.Task.alldoorgeven, krijgen we een taak terug die niet wordt voltooid totdat alle kopieerbewerkingen zijn voltooid. Als u bekend bent met Promise.all JavaScript, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Notitie
Hoewel taken conceptueel vergelijkbaar zijn met JavaScript-beloften, moeten orchestrator-functies worden gebruikt context.df.Task.all en context.df.Task.any in plaats van Promise.all en Promise.race om taakparallellisatie te beheren.
Na het opleveren van context.df.Task.all, weten we dat alle functieaanroepen zijn voltooid en waarden hebben geretourneerd naar ons. Elke aanroep om E2_CopyFileToBlob het aantal geüploade bytes te retourneren, dus het berekenen van het totale byteaantal is een kwestie van het samenvoegen van al deze retourwaarden.
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Let op de yield context.df.Task.all(tasks); regel. Alle afzonderlijke aanroepen naar de copyFileToBlob functie zijn niet geretourneerd, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze reeks taken context.df.Task.alldoorgeven, krijgen we een taak terug die niet wordt voltooid totdat alle kopieerbewerkingen zijn voltooid. Als u bekend bent met Promise.all JavaScript, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Notitie
Hoewel taken conceptueel vergelijkbaar zijn met JavaScript-beloften, moeten orchestrator-functies worden gebruikt context.df.Task.all en context.df.Task.any in plaats van Promise.all en Promise.race om taakparallellisatie te beheren.
Na het opleveren van context.df.Task.all, weten we dat alle functieaanroepen zijn voltooid en waarden hebben geretourneerd naar ons. Elke aanroep om copyFileToBlob het aantal geüploade bytes te retourneren, dus het berekenen van het totale byteaantal is een kwestie van het samenvoegen van al deze retourwaarden.
De functie maakt gebruik van de standaard function.json voor orchestratorfuncties.
Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Let op de yield context.task_all(tasks); regel. Alle afzonderlijke aanroepen naar de E2_CopyFileToBlob functie zijn niet geretourneerd, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze reeks taken context.task_alldoorgeven, krijgen we een taak terug die niet wordt voltooid totdat alle kopieerbewerkingen zijn voltooid. Als u bekend bent met asyncio.gather Python, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.
Notitie
Hoewel taken conceptueel vergelijkbaar zijn met Python awaitables, moeten orchestratorfuncties evenals de context.task_all en context.task_any API's voor het beheren van taakparallelisatie worden gebruiktyield.
Na het opleveren van context.task_all, weten we dat alle functieaanroepen zijn voltooid en waarden hebben geretourneerd naar ons. Elke aanroep om E2_CopyFileToBlob het aantal geüploade bytes te retourneren, zodat we het totale byteaantal kunnen berekenen door alle retourwaarden samen te voegen.
Helper-activiteitsfuncties
De helperactiviteitsfuncties, net als bij andere voorbeelden, zijn gewoon reguliere functies die gebruikmaken van de activityTrigger triggerbinding.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Notitie
U vraagt zich misschien af waarom u deze code niet alleen rechtstreeks in de orchestratorfunctie kunt plaatsen. U kunt, maar dit zou een van de fundamentele regels van orchestratorfuncties breken, wat inhoudt dat ze nooit I/O mogen uitvoeren, inclusief toegang tot het lokale bestandssysteem. Zie Codebeperkingen voor Orchestrator-functies voor meer informatie.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Notitie
U moet het Microsoft.Azure.WebJobs.Extensions.Storage NuGet-pakket installeren om de voorbeeldcode uit te voeren.
De functie maakt gebruik van enkele geavanceerde functies van Azure Functions-bindingen (het gebruik van de Binder parameter), maar u hoeft zich geen zorgen te maken over deze details voor dit scenario.
Het function.json-bestand voor E2_CopyFileToBlob is vergelijkbaar eenvoudig:
De Python-implementatie maakt gebruik van de Azure Storage SDK voor Python om de bestanden te uploaden naar Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
De implementatie laadt het bestand vanaf de schijf en streamt de inhoud asynchroon naar een blob met dezelfde naam in de container 'back-ups'. De retourwaarde is het aantal bytes dat naar de opslag is gekopieerd, dat vervolgens door de orchestratorfunctie wordt gebruikt om de geaggregeerde som te berekenen.
Notitie
Dit is een perfect voorbeeld van het verplaatsen van I/O-bewerkingen naar een activityTrigger functie. Het werk kan niet alleen worden verdeeld over veel verschillende machines, maar u krijgt ook de voordelen van het controleren van de voortgang. Als het hostproces om welke reden dan ook wordt beëindigd, weet u welke uploads al zijn voltooid.
De voorbeeldtoepassing uitvoeren
U kunt de indeling in Windows starten door de volgende HTTP POST-aanvraag te verzenden.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
U kunt ook op een Linux-functie-app (Python wordt momenteel alleen uitgevoerd op Linux voor App Service) de indeling als volgt starten:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Notitie
De HttpStart functie die u aanroept, werkt alleen met inhoud met JSON-indeling. Daarom is de Content-Type: application/json header vereist en wordt het mappad gecodeerd als een JSON-tekenreeks. Bovendien wordt ervan uitgegaan dat er een vermelding in het host.json bestand is die het standaardvoorvoegsel api/ verwijdert uit alle URL's van HTTP-triggerfuncties. U vindt de opmaak voor deze configuratie in het host.json bestand in de voorbeelden.
Deze HTTP-aanvraag activeert de E2_BackupSiteContent orchestrator en geeft de tekenreeks D:\home\LogFiles door als een parameter. Het antwoord bevat een koppeling om de status van de back-upbewerking op te halen:
Afhankelijk van het aantal logboekbestanden in uw functie-app, kan het enkele minuten duren voordat deze bewerking is voltooid. U kunt de meest recente status ophalen door een query uit te voeren op de URL in de Location header van het vorige HTTP 202-antwoord.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
In dit geval wordt de functie nog steeds uitgevoerd. U kunt de invoer zien die is opgeslagen in de orchestratorstatus en de laatst bijgewerkte tijd. U kunt de Location headerwaarden blijven gebruiken om te peilen naar voltooiing. Wanneer de status 'Voltooid' is, ziet u een HTTP-antwoordwaarde die er ongeveer als volgt uitziet:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
U kunt nu zien dat de indeling is voltooid en ongeveer hoeveel tijd het kostte om de indeling te voltooien. U ziet ook een waarde voor het output veld, wat aangeeft dat er ongeveer 450 kB aan logboeken is geüpload.
Volgende stappen
In dit voorbeeld ziet u hoe u het fan-out/fan-in patroon implementeert. In het volgende voorbeeld ziet u hoe u het monitorpatroon implementeert met behulp van duurzame timers.