Scenario voor uitwaaiing/inwaaiing in Durable Functions - Voorbeeld van cloudback-up
Artikel
Fan-out/fan-in verwijst naar het patroon van het gelijktijdig uitvoeren van meerdere functies en het vervolgens uitvoeren van een aggregatie op de resultaten. In dit artikel wordt een voorbeeld uitgelegd waarin Durable Functions wordt gebruikt om een scenario voor in-/uitwaaiing te implementeren. Het voorbeeld is een duurzame functie die een back-up maakt van alle of een deel van de site-inhoud van een app in Azure Storage.
Notitie
Versie 4 van het Node.js programmeermodel voor Azure Functions is algemeen beschikbaar. Het nieuwe v4-model is ontworpen voor een flexibelere en intuïtievere ervaring voor JavaScript- en TypeScript-ontwikkelaars. Meer informatie over de verschillen tussen v3 en v4 vindt u in de migratiehandleiding.
In de volgende codefragmenten geeft JavaScript (PM4) het programmeermodel V4 aan, de nieuwe ervaring.
In dit voorbeeld uploaden de functies alle bestanden onder een opgegeven map recursief naar blobopslag. Ze tellen ook het totale aantal bytes dat is geüpload.
Het is mogelijk om één functie te schrijven die alles regelt. Het belangrijkste probleem dat u zou tegenkomen, is schaalbaarheid. Een uitvoering van één functie kan slechts op één virtuele machine worden uitgevoerd, dus de doorvoer wordt beperkt door de doorvoer van die ene VM. Een ander probleem is de betrouwbaarheid. Als er halverwege een fout optreedt of als het hele proces langer dan 5 minuten duurt, kan de back-up mislukken in een gedeeltelijk voltooide status. Vervolgens moet het opnieuw worden opgestart.
Een krachtigere benadering is het schrijven van twee reguliere functies: een zou de bestanden opsommen en de bestandsnamen toevoegen aan een wachtrij, en een andere zou uit de wachtrij lezen en de bestanden uploaden naar blob-opslag. Deze benadering is beter in termen van doorvoer en betrouwbaarheid, maar vereist dat u een wachtrij inricht en beheert. Belangrijker is dat er een aanzienlijke complexiteit wordt geïntroduceerd in termen van statusbeheer en coördinatie als u meer wilt doen, zoals het rapporteren van het totale aantal geüploade bytes.
Een Durable Functions aanpak biedt u alle genoemde voordelen met een zeer lage overhead.
De functies
In dit artikel worden de volgende functies in de voorbeeld-app uitgelegd:
E2_BackupSiteContent: een orchestratorfunctie die aanroept E2_GetFileList om een lijst met bestanden te verkrijgen waarvan u een back-up wilt maken en die vervolgens aanroept E2_CopyFileToBlob om een back-up van elk bestand te maken.
E2_GetFileList: Een activiteitsfunctie die een lijst met bestanden in een map retourneert.
E2_CopyFileToBlob: Een activiteitsfunctie die een back-up maakt van één bestand naar Azure Blob Storage.
E2_BackupSiteContent orchestratorfunctie
Deze orchestratorfunctie doet in wezen het volgende:
Neemt een rootDirectory waarde als invoerparameter.
Roept een functie aan om een recursieve lijst met bestanden onder rootDirectoryop te halen.
Maakt meerdere parallelle functie-aanroepen om elk bestand te uploaden naar Azure Blob Storage.
Wacht totdat alle uploads zijn voltooid.
Retourneert de som van het totale aantal bytes dat is geüpload naar Azure Blob Storage.
Dit is de code waarmee de orchestratorfunctie wordt geïmplementeerd:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Let op de await Task.WhenAll(tasks); regel. Alle afzonderlijke aanroepen van de E2_CopyFileToBlob functie zijn niet afgewacht, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze matrix met taken doorgeven aan Task.WhenAll, krijgen we een taak terug die pas wordt voltooid als alle kopieerbewerkingen zijn voltooid. Als u bekend bent met de TPL (Task Parallel Library) in .NET, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en de extensie Durable Functions zorgt ervoor dat de end-to-end-uitvoering bestand is tegen procesrecycling.
Nadat we hebben gewacht van Task.WhenAll, weten we dat alle functie-aanroepen zijn voltooid en waarden naar ons hebben geretourneerd. Elke aanroep naar E2_CopyFileToBlob retourneert het aantal geüploade bytes, dus het berekenen van het totale aantal bytes is een kwestie van het optellen van al deze retourwaarden bij elkaar.
De functie maakt gebruik van de standaard function.json voor orchestrator-functies.
Dit is de code waarmee de orchestratorfunctie wordt geïmplementeerd:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Let op de yield context.df.Task.all(tasks); regel. Alle afzonderlijke aanroepen naar de E2_CopyFileToBlob functie zijn niet geretourneerd, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze matrix met taken doorgeven aan context.df.Task.all, krijgen we een taak terug die pas wordt voltooid als alle kopieerbewerkingen zijn voltooid. Als u bekend bent met Promise.all JavaScript, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en de extensie Durable Functions zorgt ervoor dat de end-to-end-uitvoering bestand is tegen procesrecycling.
Notitie
Hoewel taken conceptueel vergelijkbaar zijn met JavaScript-beloftes, moeten context.df.Task.all orchestratorfuncties en context.df.Task.any gebruiken in plaats van Promise.all en Promise.race om taakparallellisatie te beheren.
Na het ophalen van context.df.Task.all, weten we dat alle functie-aanroepen zijn voltooid en waarden naar ons hebben geretourneerd. Elke aanroep naar E2_CopyFileToBlob retourneert het aantal geüploade bytes, dus het berekenen van het totale aantal bytes is een kwestie van het optellen van al deze retourwaarden bij elkaar.
Dit is de code waarmee de orchestratorfunctie wordt geïmplementeerd:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Let op de yield context.df.Task.all(tasks); regel. Alle afzonderlijke aanroepen naar de copyFileToBlob functie zijn niet geretourneerd, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze matrix met taken doorgeven aan context.df.Task.all, krijgen we een taak terug die pas wordt voltooid als alle kopieerbewerkingen zijn voltooid. Als u bekend bent met Promise.all JavaScript, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en de extensie Durable Functions zorgt ervoor dat de end-to-end-uitvoering bestand is tegen procesrecycling.
Notitie
Hoewel taken conceptueel vergelijkbaar zijn met JavaScript-beloftes, moeten context.df.Task.all orchestratorfuncties en context.df.Task.any gebruiken in plaats van Promise.all en Promise.race om taakparallellisatie te beheren.
Na het ophalen van context.df.Task.all, weten we dat alle functie-aanroepen zijn voltooid en waarden naar ons hebben geretourneerd. Elke aanroep naar copyFileToBlob retourneert het aantal geüploade bytes, dus het berekenen van het totale aantal bytes is een kwestie van het optellen van al deze retourwaarden bij elkaar.
De functie maakt gebruik van de standaard function.json voor orchestrator-functies.
Dit is de code waarmee de orchestratorfunctie wordt geïmplementeerd:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Let op de yield context.task_all(tasks); regel. Alle afzonderlijke aanroepen naar de E2_CopyFileToBlob functie zijn niet geretourneerd, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze matrix met taken doorgeven aan context.task_all, krijgen we een taak terug die pas wordt voltooid als alle kopieerbewerkingen zijn voltooid. Als u bekend bent met asyncio.gather python, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en de extensie Durable Functions zorgt ervoor dat de end-to-end-uitvoering bestand is tegen procesrecycling.
Notitie
Hoewel taken conceptueel vergelijkbaar zijn met Python awaitables, moeten orchestratorfuncties de context.task_all API's en context.task_any gebruiken yield voor het beheren van taakparallellisatie.
Na het ophalen van context.task_all, weten we dat alle functie-aanroepen zijn voltooid en waarden naar ons hebben geretourneerd. Elke aanroep naar E2_CopyFileToBlob retourneert het aantal geüploade bytes, zodat we het totale aantal byte's kunnen berekenen door alle retourwaarden bij elkaar op te tellen.
Helper-activiteitsfuncties
De helperactiviteitsfuncties zijn, net als bij andere voorbeelden, gewoon normale functies die gebruikmaken van de activityTrigger triggerbinding.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Notitie
U vraagt zich misschien af waarom u deze code niet rechtstreeks in de orchestratorfunctie kunt plaatsen. Dat zou kunnen, maar dit zou een van de fundamentele regels van orchestratorfuncties breken, namelijk dat ze nooit I/O mogen uitvoeren, inclusief toegang tot het lokale bestandssysteem. Zie Beperkingen voor Orchestrator-functiecode voor meer informatie.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Notitie
U moet het Microsoft.Azure.WebJobs.Extensions.Storage NuGet-pakket installeren om de voorbeeldcode uit te voeren.
De functie maakt gebruik van enkele geavanceerde functies van Azure Functions bindingen (dat wil gezegd, het gebruik van de Binder parameter), maar u hoeft zich geen zorgen te maken over deze details voor het doel van dit scenario.
Het bestand function.json voor E2_CopyFileToBlob is op dezelfde manier eenvoudig:
De Python-implementatie maakt gebruik van de Azure Storage SDK voor Python om de bestanden te uploaden naar Azure Blob Storage.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Tijdens de implementatie wordt het bestand van de schijf geladen en wordt de inhoud asynchroon gestreamd naar een blob met dezelfde naam in de container 'back-ups'. De retourwaarde is het aantal bytes dat naar de opslag wordt gekopieerd, die vervolgens door de orchestratorfunctie wordt gebruikt om de geaggregeerde som te berekenen.
Notitie
Dit is een perfect voorbeeld van het verplaatsen van I/O-bewerkingen naar een activityTrigger functie. Het werk kan niet alleen worden verdeeld over veel verschillende machines, maar u krijgt ook de voordelen van het controleren van de voortgang. Als het hostproces om welke reden dan ook wordt beëindigd, weet u welke uploads al zijn voltooid.
De voorbeeldtoepassing uitvoeren
U kunt de indeling in Windows starten door de volgende HTTP POST-aanvraag te verzenden.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
In een Linux-functie-app (Python wordt momenteel alleen uitgevoerd op Linux voor App Service), kunt u de indeling als volgt starten:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Notitie
De HttpStart functie die u aanroept, werkt alleen met inhoud in JSON-indeling. Daarom is de Content-Type: application/json header vereist en wordt het mappad gecodeerd als een JSON-tekenreeks. Bovendien wordt in het HTTP-fragment ervan uitgegaan dat er een vermelding in het host.json bestand is die het standaardvoorvoegsel api/ verwijdert uit alle URL's voor HTTP-triggerfuncties. U vindt de markeringen voor deze configuratie in het host.json bestand in de voorbeelden.
Deze HTTP-aanvraag activeert de E2_BackupSiteContent orchestrator en geeft de tekenreeks D:\home\LogFiles door als een parameter. Het antwoord bevat een koppeling om de status van de back-upbewerking op te halen:
Afhankelijk van het aantal logboekbestanden dat uw functie-app bevat, kan deze bewerking enkele minuten duren. U kunt de meest recente status ophalen door een query uit te voeren op de URL in de Location header van het vorige HTTP 202-antwoord.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
In dit geval wordt de functie nog steeds uitgevoerd. U kunt de invoer zien die is opgeslagen in de orchestratorstatus en het tijdstip van de laatste update. U kunt de Location headerwaarden blijven gebruiken om te peilen naar voltooiing. Wanneer de status 'Voltooid' is, ziet u een HTTP-antwoordwaarde die er ongeveer als volgt uitziet:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
U kunt nu zien dat de indeling is voltooid en hoeveel tijd het ongeveer kostte om te voltooien. U ziet ook een waarde voor het output veld, wat aangeeft dat er ongeveer 450 kB aan logboeken is geüpload.
Volgende stappen
In dit voorbeeld is getoond hoe u het patroon voor uitwaaiers/inwaaiers implementeert. In het volgende voorbeeld ziet u hoe u het monitorpatroon implementeert met behulp van duurzame timers.