Fan-out/fan-in forgatókönyv a Durable Functionsben – Példa felhőbeli biztonsági mentésre
Cikk
A fan-out/fan-in a több függvény egyidejű végrehajtásának, majd az eredmények összesítésének mintázatára utal. Ez a cikk egy olyan mintát ismertet, amely a Durable Functions használatával valósít meg egy ventilátoros/kifúvatásos forgatókönyvet. A minta egy tartós függvény, amely biztonsági másolatot készít egy alkalmazás webhelytartalmairól vagy egy részéről az Azure Storage-ba.
Feljegyzés
Az Azure Functions Node.js programozási modelljének 4- es verziója általánosan elérhető. Az új v4-modell úgy lett kialakítva, hogy rugalmasabb és intuitívabb felhasználói élményt nyújtson JavaScript- és TypeScript-fejlesztők számára. A migrálási útmutatóban további információt olvashat a v3 és a v4 közötti különbségekről.
A következő kódrészletekben a JavaScript (PM4) a V4 programozási modellt, az új felületet jelöli.
Ebben a mintában a függvények rekurzív módon töltik fel az összes fájlt egy adott könyvtárban a Blob Storage-ba. A feltöltött bájtok teljes számát is megszámolják.
Egyetlen függvényt is írhat, amely mindenről gondoskodik. A legfőbb probléma a méretezhetőség. Egyetlen függvény végrehajtása csak egyetlen virtuális gépen futtatható, így az átviteli sebességet az adott virtuális gép átviteli sebessége korlátozza. Egy másik probléma a megbízhatóság. Ha a folyamat közepén hiba történik, vagy ha a teljes folyamat több mint 5 percet vesz igénybe, a biztonsági mentés részben befejezett állapotban meghiúsulhat. Ezután újra kell indítani.
Egy robusztusabb módszer két normál függvény írása: az egyik számba venné a fájlokat, és hozzáadná a fájlneveket egy üzenetsorhoz, egy másik pedig felolvassa az üzenetsorból, és feltöltené a fájlokat a Blob Storage-ba. Ez a megközelítés jobb az átviteli sebesség és a megbízhatóság szempontjából, de szükség van egy üzenetsor kiépítésére és kezelésére. Ennél is fontosabb, hogy az állapotkezelés és a koordináció szempontjából jelentős összetettség jelenik meg, ha többet szeretne tenni, például a feltöltött bájtok teljes számának jelentéséhez.
A Durable Functions-megközelítés az összes említett előnyt biztosítja, nagyon alacsony többletterhelés mellett.
A függvények
Ez a cikk a mintaalkalmazás alábbi funkcióit ismerteti:
E2_BackupSiteContent: Egy vezénylő függvény , amely meghívja E2_GetFileList a biztonsági mentéshez szükséges fájlok listájának lekérését, majd meghívja E2_CopyFileToBlob az egyes fájlok biztonsági mentését.
E2_GetFileList: Egy tevékenységfüggvény , amely egy könyvtárban lévő fájlok listáját adja vissza.
E2_CopyFileToBlob: Egy tevékenységfüggvény, amely egyetlen fájlról készít biztonsági másolatot az Azure Blob Storage-ba.
E2_BackupSiteContent vezénylő függvény
Ez a vezénylő függvény lényegében a következőket teszi:
rootDirectory Egy értéket bemeneti paraméterként vesz fel.
Függvényt hív meg a fájlok rootDirectoryrekurzív listájának lekéréséhez.
Több párhuzamos függvényhívást indít az egyes fájlok Azure Blob Storage-ba való feltöltéséhez.
Megvárja, amíg az összes feltöltés befejeződik.
Az Azure Blob Storage-ba feltöltött bájtok összegét adja vissza.
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
Figyelje meg a await Task.WhenAll(tasks); sort. A függvény minden E2_CopyFileToBlob egyes hívása nem várt, ami lehetővé teszi számukra, hogy párhuzamosan fussanak. Amikor átadjuk ezt a feladattömböt Task.WhenAll, egy olyan feladatot kapunk vissza, amely nem fejeződik be , amíg az összes másolási művelet be nem fejeződik. Ha ismeri a .NET-beli feladat-párhuzamos kódtárat (TPL), akkor ez nem újdonság Önnek. A különbség az, hogy ezek a feladatok egyszerre több virtuális gépen is futhatnak, és a Durable Functions bővítmény biztosítja, hogy a végpontok közötti végrehajtás rugalmas legyen az újrafeldolgozási folyamatokkal szemben.
A várakozás Task.WhenAllután tudjuk, hogy az összes függvényhívás befejeződött, és visszaadta nekünk az értékeket. Minden hívás E2_CopyFileToBlob a feltöltött bájtok számát adja vissza, ezért a teljes bájtszám kiszámításához össze kell adni az összes visszaadott értéket.
A függvény a szabványos function.json használja a vezénylési függvényekhez.
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
Figyelje meg a yield context.df.Task.all(tasks); sort. A függvényhez intézett E2_CopyFileToBlob összes egyes hívás nem lett engedélyezve, ami lehetővé teszi számukra, hogy párhuzamosan fussanak. Amikor átadjuk ezt a feladattömböt context.df.Task.all, egy olyan feladatot kapunk vissza, amely nem fejeződik be , amíg az összes másolási művelet be nem fejeződik. Ha ismeri Promise.all a JavaScriptet, akkor ez nem újdonság Önnek. A különbség az, hogy ezek a feladatok egyszerre több virtuális gépen is futhatnak, és a Durable Functions bővítmény biztosítja, hogy a végpontok közötti végrehajtás rugalmas legyen az újrafeldolgozási folyamatokkal szemben.
Feljegyzés
Bár a feladatok fogalmilag hasonlóak a JavaScript-ígéretekhez, a vezénylő függvények a feladat párhuzamosítását Promise.all és helyett és Promise.race kezelését kell használniuk context.df.Task.allcontext.df.Task.any.
A visszatérés context.df.Task.allután tudjuk, hogy az összes függvényhívás befejeződött, és visszaadta nekünk az értékeket. Minden hívás E2_CopyFileToBlob a feltöltött bájtok számát adja vissza, ezért a teljes bájtszám kiszámításához össze kell adni az összes visszaadott értéket.
A vezénylő függvényt megvalósító kód a következő:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
Figyelje meg a yield context.df.Task.all(tasks); sort. A függvényhez intézett copyFileToBlob összes egyes hívás nem lett engedélyezve, ami lehetővé teszi számukra, hogy párhuzamosan fussanak. Amikor átadjuk ezt a feladattömböt context.df.Task.all, egy olyan feladatot kapunk vissza, amely nem fejeződik be , amíg az összes másolási művelet be nem fejeződik. Ha ismeri Promise.all a JavaScriptet, akkor ez nem újdonság Önnek. A különbség az, hogy ezek a feladatok egyszerre több virtuális gépen is futhatnak, és a Durable Functions bővítmény biztosítja, hogy a végpontok közötti végrehajtás rugalmas legyen az újrafeldolgozási folyamatokkal szemben.
Feljegyzés
Bár a feladatok fogalmilag hasonlóak a JavaScript-ígéretekhez, a vezénylő függvények a feladat párhuzamosítását Promise.all és helyett és Promise.race kezelését kell használniuk context.df.Task.allcontext.df.Task.any.
A visszatérés context.df.Task.allután tudjuk, hogy az összes függvényhívás befejeződött, és visszaadta nekünk az értékeket. Minden hívás copyFileToBlob a feltöltött bájtok számát adja vissza, ezért a teljes bájtszám kiszámításához össze kell adni az összes visszaadott értéket.
A függvény a szabványos function.json használja a vezénylési függvényekhez.
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
Figyelje meg a yield context.task_all(tasks); sort. A függvényhez intézett E2_CopyFileToBlob összes egyes hívás nem lett engedélyezve, ami lehetővé teszi számukra, hogy párhuzamosan fussanak. Amikor átadjuk ezt a feladattömböt context.task_all, egy olyan feladatot kapunk vissza, amely nem fejeződik be , amíg az összes másolási művelet be nem fejeződik. Ha ismeri asyncio.gather a Pythont, akkor ez nem újdonság Önnek. A különbség az, hogy ezek a feladatok egyszerre több virtuális gépen is futhatnak, és a Durable Functions bővítmény biztosítja, hogy a végpontok közötti végrehajtás rugalmas legyen az újrafeldolgozási folyamatokkal szemben.
Feljegyzés
Bár a feladatok fogalmilag hasonlítanak a Pythonra váró feladatokhoz, a vezénylő függvényeknek, valamint az API-knak is használniuk yield kell a context.task_allcontext.task_any feladat párhuzamosításának kezelésére.
A visszatérés context.task_allután tudjuk, hogy az összes függvényhívás befejeződött, és visszaadta nekünk az értékeket. Minden hívás E2_CopyFileToBlob a feltöltött bájtok számát adja vissza, így az összes visszatérési érték összeadásával kiszámíthatjuk az összeg bájtszámát.
Segítő tevékenységfüggvények
A segítő tevékenységfüggvények, a többi mintához hasonlóan, csak a triggerkötést használó activityTrigger szokásos függvények.
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Feljegyzés
Felmerülhet a kérdés, hogy miért nem tudta közvetlenül a vezénylő függvénybe helyezni ezt a kódot. Megteheti, de ez megtörné a vezénylési funkciók egyik alapvető szabályát, ami azt jelentené, hogy soha nem szabad I/O-t végezniük, beleértve a helyi fájlrendszer-hozzáférést is. További információ: Orchestrator függvénykód korlátozásai.
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
[ActivityTrigger] string filePath,
Binder binder,
ILogger log)
{
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath
.Substring(Path.GetPathRoot(filePath).Length)
.Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(
new BlobAttribute(outputLocation, FileAccess.Write)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
Feljegyzés
A mintakód futtatásához telepítenie kell a Microsoft.Azure.WebJobs.Extensions.Storage NuGet-csomagot.
A függvény az Azure Functions-kötések néhány speciális funkcióját (azaz a Binder paraméter használatát) használja, de az útmutatóhoz nem kell aggódnia ezek miatt a részletek miatt.
A function.json fájl E2_CopyFileToBlob hasonló egyszerű:
A Python-implementáció az Azure Storage SDK for Python használatával tölti fel a fájlokat az Azure Blob Storage-ba.
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Az implementáció betölti a fájlt a lemezről, és aszinkron módon streameli a tartalmat egy azonos nevű blobba a "backups" tárolóban. A visszatérési érték a tárba másolt bájtok száma, amelyet a vezénylő függvény az összesített összeg kiszámításához használ.
Feljegyzés
Ez egy tökéletes példa az I/O-műveletek függvénybe való áthelyezésére activityTrigger . A munka nem csak számos különböző gépen osztható el, hanem az előrehaladás ellenőrzésének előnyei is. Ha a gazdafolyamat bármilyen okból leáll, tudja, hogy mely feltöltések fejeződtek be.
Minta futtatása
A vezénylést Windows rendszeren az alábbi HTTP POST-kérés elküldésével indíthatja el.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Másik lehetőségként linuxos függvényalkalmazáson (a Python jelenleg csak Linux for App Service-en fut) a vezénylést a következőképpen indíthatja el:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Feljegyzés
Az HttpStart éppen invokált függvény csak JSON formátumú tartalommal működik. Ezért a Content-Type: application/json fejlécre szükség van, és a címtár elérési útja JSON-sztringként van kódolva. Emellett a HTTP-kódrészlet feltételezi, hogy van egy bejegyzés a host.json fájlban, amely eltávolítja az alapértelmezett api/ előtagot az összes HTTP-eseményindító függvény URL-címéről. A konfiguráció jelölője a host.json mintákban található fájlban található.
Ez a HTTP-kérés aktiválja a E2_BackupSiteContent vezénylőt, és paraméterként adja át a sztringet D:\home\LogFiles . A válasz egy hivatkozást tartalmaz a biztonsági mentési művelet állapotának lekéréséhez:
Attól függően, hogy hány naplófájl található a függvényalkalmazásban, ez a művelet több percet is igénybe vehet. A legújabb állapotot az előző HTTP 202-válasz fejlécében található Location URL-cím lekérdezésével kaphatja meg.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
Ebben az esetben a függvény továbbra is fut. Megtekintheti a vezénylési állapotba mentett bemenetet és az utolsó frissítés időpontját. A fejlécértékek használatával Location továbbra is lekérdezheti a befejezést. Ha az állapot "Befejezve" állapotú, a következőhöz hasonló HTTP-válaszérték jelenik meg:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Most már láthatja, hogy a vezénylés befejeződött, és körülbelül mennyi időt vett igénybe. Megjelenik a output mező értéke is, amely azt jelzi, hogy körülbelül 450 KB napló lett feltöltve.
Következő lépések
Ez a minta bemutatja, hogyan valósíthatja meg a ventilátor-out/ventilátor-in mintát. A következő minta bemutatja, hogyan implementálhatja a monitormintát tartós időzítőkkel.