Scénář fan-out/fan-in v Durable Functions – příklad cloudového zálohování
Článek
Ventilátory nebo ventilátory odkazují na model souběžného spouštění více funkcí a následné agregace výsledků. Tento článek vysvětluje ukázku, která používá Durable Functions k implementaci scénáře fan-in/fan-out. Ukázka je odolná funkce, která zálohuje veškerý obsah webu aplikace nebo její obsah do Služby Azure Storage.
Poznámka:
Verze 4 programovacího modelu Node.js pro Azure Functions je obecně dostupná. Nový model v4 je navržený tak, aby měl flexibilnější a intuitivnější prostředí pro vývojáře v JavaScriptu a TypeScriptu. Další informace o rozdílech mezi v3 a v4 najdete v průvodci migrací.
V následujících fragmentech kódu JavaScript (PM4) označuje programovací model V4, nové prostředí.
V této ukázce funkce nahrají všechny soubory v zadaném adresáři rekurzivně do úložiště objektů blob. Počítají také celkový počet bajtů, které byly odeslány.
Je možné napsat jednu funkci, která se postará o všechno. Hlavním problémem, na který byste narazili, je škálovatelnost. Spuštění jedné funkce se dá spustit jenom na jednom virtuálním počítači, takže propustnost bude omezená propustností daného virtuálního počítače. Dalším problémem je spolehlivost. Pokud dojde k selhání uprostřed procesu nebo pokud celý proces trvá déle než 5 minut, může zálohování selhat v částečně dokončeném stavu. Pak by bylo potřeba ho restartovat.
Robustnějším přístupem by bylo zapsat dvě běžné funkce: jeden by vyčíslil soubory a přidal názvy souborů do fronty a jiný by načetl z fronty a nahrál soubory do úložiště objektů blob. Tento přístup je lepší z hlediska propustnosti a spolehlivosti, ale vyžaduje zřízení a správu fronty. Důležitější je, že významnější složitost se zavádí z hlediska správy stavu a koordinace , pokud chcete udělat něco víc, například hlásit celkový počet bajtů nahraných.
Přístup Durable Functions poskytuje všechny uvedené výhody s velmi nízkou režií.
Funkce
Tento článek vysvětluje následující funkce v ukázkové aplikaci:
Tady je kód, který implementuje funkci orchestrátoru:
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
[OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>()?.Trim();
if (string.IsNullOrEmpty(rootDirectory))
{
rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
}
string[] files = await backupContext.CallActivityAsync<string[]>(
"E2_GetFileList",
rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallActivityAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
await Task.WhenAll(tasks); Všimněte si řádku. Všechna jednotlivá volání E2_CopyFileToBlob funkce nebyla očekávána, což jim umožňuje spustit paralelně. Když toto pole úkolů Task.WhenAllpředáme, získáme zpět úkol, který se nedokončí , dokud se nedokončí všechny operace kopírování. Pokud znáte knihovnu TPL (Task Parallel Library) v rozhraní .NET, není to pro vás nové. Rozdíl je v tom, že tyto úlohy můžou běžet na několika virtuálních počítačích současně a rozšíření Durable Functions zajišťuje, že je kompletní spouštění odolné vůči recyklaci procesů.
Po čekání na Task.WhenAllvolání víme, že všechna volání funkcí se dokončila a vrátila nám hodnoty zpět. Každé volání, které E2_CopyFileToBlob vrátí počet nahraných bajtů, takže výpočet celkového počtu bajtů je otázkou přidání všech vrácených hodnot dohromady.
Funkce používá standardní function.json pro funkce orchestrátoru.
Tady je kód, který implementuje funkci orchestrátoru:
const df = require("durable-functions");
module.exports = df.orchestrator(function* (context) {
const rootDirectory = context.df.getInput();
if (!rootDirectory) {
throw new Error("A directory path is required as an input.");
}
const files = yield context.df.callActivity("E2_GetFileList", rootDirectory);
// Backup Files and save Promises into array
const tasks = [];
for (const file of files) {
tasks.push(context.df.callActivity("E2_CopyFileToBlob", file));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results.reduce((prev, curr) => prev + curr, 0);
// return results;
return totalBytes;
});
yield context.df.Task.all(tasks); Všimněte si řádku. Všechna jednotlivá volání E2_CopyFileToBlob funkce nebyla vyvolána, což jim umožňuje běžet paralelně. Když toto pole úkolů context.df.Task.allpředáme, získáme zpět úkol, který se nedokončí , dokud se nedokončí všechny operace kopírování. Pokud jste obeznámeni s JavaScriptem Promise.all , není to pro vás nové. Rozdíl je v tom, že tyto úlohy můžou běžet na několika virtuálních počítačích současně a rozšíření Durable Functions zajišťuje, že je kompletní spouštění odolné vůči recyklaci procesů.
Poznámka:
I když jsou úkoly koncepčně podobné příslibům JavaScriptu, měly by funkce orchestrátoru používat context.df.Task.all a context.df.Task.any ne Promise.allPromise.race a spravovat paralelizaci úkolů.
Jakmile se vrátíme context.df.Task.all, víme, že všechna volání funkcí se dokončila a vrátila zpět hodnoty. Každé volání, které E2_CopyFileToBlob vrátí počet nahraných bajtů, takže výpočet celkového počtu bajtů je otázkou přidání všech vrácených hodnot dohromady.
Tady je kód, který implementuje funkci orchestrátoru:
const df = require("durable-functions");
const path = require("path");
const getFileListActivityName = "getFileList";
const copyFileToBlobActivityName = "copyFileToBlob";
df.app.orchestration("backupSiteContent", function* (context) {
const rootDir = context.df.getInput();
if (!rootDir) {
throw new Error("A directory path is required as an input.");
}
const rootDirAbs = path.resolve(rootDir);
const files = yield context.df.callActivity(getFileListActivityName, rootDirAbs);
// Backup Files and save Tasks into array
const tasks = [];
for (const file of files) {
const input = {
backupPath: path.relative(rootDirAbs, file).replace("\\", "/"),
filePath: file,
};
tasks.push(context.df.callActivity(copyFileToBlobActivityName, input));
}
// wait for all the Backup Files Activities to complete, sum total bytes
const results = yield context.df.Task.all(tasks);
const totalBytes = results ? results.reduce((prev, curr) => prev + curr, 0) : 0;
// return results;
return totalBytes;
});
yield context.df.Task.all(tasks); Všimněte si řádku. Všechna jednotlivá volání copyFileToBlob funkce nebyla vyvolána, což jim umožňuje běžet paralelně. Když toto pole úkolů context.df.Task.allpředáme, získáme zpět úkol, který se nedokončí , dokud se nedokončí všechny operace kopírování. Pokud jste obeznámeni s JavaScriptem Promise.all , není to pro vás nové. Rozdíl je v tom, že tyto úlohy můžou běžet na několika virtuálních počítačích současně a rozšíření Durable Functions zajišťuje, že je kompletní spouštění odolné vůči recyklaci procesů.
Poznámka:
I když jsou úkoly koncepčně podobné příslibům JavaScriptu, měly by funkce orchestrátoru používat context.df.Task.all a context.df.Task.any ne Promise.allPromise.race a spravovat paralelizaci úkolů.
Jakmile se vrátíme context.df.Task.all, víme, že všechna volání funkcí se dokončila a vrátila zpět hodnoty. Každé volání, které copyFileToBlob vrátí počet nahraných bajtů, takže výpočet celkového počtu bajtů je otázkou přidání všech vrácených hodnot dohromady.
Funkce používá standardní function.json pro funkce orchestrátoru.
Tady je kód, který implementuje funkci orchestrátoru:
import azure.functions as func
import azure.durable_functions as df
def orchestrator_function(context: df.DurableOrchestrationContext):
root_directory: str = context.get_input()
if not root_directory:
raise Exception("A directory path is required as input")
files = yield context.call_activity("E2_GetFileList", root_directory)
tasks = []
for file in files:
tasks.append(context.call_activity("E2_CopyFileToBlob", file))
results = yield context.task_all(tasks)
total_bytes = sum(results)
return total_bytes
main = df.Orchestrator.create(orchestrator_function)
yield context.task_all(tasks); Všimněte si řádku. Všechna jednotlivá volání E2_CopyFileToBlob funkce nebyla vyvolána, což jim umožňuje běžet paralelně. Když toto pole úkolů context.task_allpředáme, získáme zpět úkol, který se nedokončí , dokud se nedokončí všechny operace kopírování. Pokud jste obeznámeni s Pythonem asyncio.gather , není to pro vás nové. Rozdíl je v tom, že tyto úlohy můžou běžet na několika virtuálních počítačích současně a rozšíření Durable Functions zajišťuje, že je kompletní spouštění odolné vůči recyklaci procesů.
Poznámka:
I když jsou úkoly koncepčně podobné očekávaným funkcím Pythonu, měly by funkce orchestrátoru context.task_allcontext.task_any používat yield i rozhraní API ke správě paralelizace úloh.
Jakmile se vrátíme context.task_all, víme, že všechna volání funkcí se dokončila a vrátila zpět hodnoty. Každé volání, které E2_CopyFileToBlob vrátí počet nahraných bajtů, abychom mohli vypočítat celkový počet bajtů přidáním všech vrácených hodnot dohromady.
Pomocné funkce aktivit
Pomocné funkce aktivit, stejně jako u jiných ukázek, jsou pouze běžné funkce, které používají vazbu triggeru activityTrigger .
import os
from os.path import dirname
from typing import List
def main(rootDirectory: str) -> List[str]:
all_file_paths = []
# We walk the file system
for path, _, files in os.walk(rootDirectory):
# We copy the code for activities and orchestrators
if "E2_" in path:
# For each file, we add their full-path to the list
for name in files:
if name == "__init__.py" or name == "function.json":
file_path = os.path.join(path, name)
all_file_paths.append(file_path)
return all_file_paths
Poznámka:
Možná vás zajímá, proč jste tento kód nemohli jednoduše vložit přímo do funkce orchestrátoru. Mohli byste, ale to by přerušilo jedno ze základních pravidel funkcí orchestrátoru, což znamená, že by nikdy neměly provádět vstupně-výstupní operace, včetně přístupu k místnímu systému souborů. Další informace najdete v tématu Omezení kódu funkce orchestratoru.
Funkce používá některé pokročilé funkce vazeb Azure Functions (to znamená použití parametruBinder), ale nemusíte se starat o tyto podrobnosti pro účely tohoto názorného postupu.
Soubor function.json je E2_CopyFileToBlob podobně jednoduchý:
import os
import pathlib
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
connect_str = os.getenv('AzureWebJobsStorage')
def main(filePath: str) -> str:
# Create the BlobServiceClient object which will be used to create a container client
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
# Create a unique name for the container
container_name = "backups"
# Create the container if it does not exist
try:
blob_service_client.create_container(container_name)
except ResourceExistsError:
pass
# Create a blob client using the local file name as the name for the blob
parent_dir, fname = pathlib.Path(filePath).parts[-2:] # Get last two path components
blob_name = parent_dir + "_" + fname
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
# Count bytes in file
byte_count = os.path.getsize(filePath)
# Upload the created file
with open(filePath, "rb") as data:
blob_client.upload_blob(data)
return byte_count
Implementace načte soubor z disku a asynchronně streamuje obsah do objektu blob se stejným názvem v kontejneru "backups". Vrácená hodnota je počet bajtů zkopírovaných do úložiště, který pak používá funkce orchestrátoru k výpočtu agregovaného součtu.
Poznámka:
Toto je dokonalý příklad přesunu vstupně-výstupních operací do activityTrigger funkce. Nejen, že je možné práci distribuovat napříč mnoha různými počítači, ale získáte také výhody kontrolního bodu průběhu. Pokud se proces hostitele z nějakého důvodu ukončí, víte, které nahrávání se už dokončilo.
Spuštění ukázky
Orchestraci můžete spustit ve Windows odesláním následujícího požadavku HTTP POST.
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"D:\\home\\LogFiles"
Případně můžete v linuxové aplikaci funkcí (Python aktuálně běží jenom v Linuxu pro App Service), spustit orchestraci takto:
POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20
"/home/site/wwwroot"
Poznámka:
Funkce HttpStart , kterou vyvoláváte, funguje jenom s obsahem ve formátu JSON. Z tohoto důvodu je hlavička Content-Type: application/json povinná a cesta k adresáři je zakódovaná jako řetězec JSON. Kromě toho fragment kódu HTTP předpokládá, že v host.json souboru existuje položka, která odebere výchozí api/ předponu ze všech adres URL funkcí triggeru HTTP. Značky pro tuto konfiguraci najdete v host.json souboru v ukázkách.
Tento požadavek HTTP aktivuje E2_BackupSiteContent orchestrátor a předá řetězec D:\home\LogFiles jako parametr. Odpověď poskytuje odkaz na získání stavu operace zálohování:
V závislosti na tom, kolik souborů protokolu máte v aplikaci funkcí, může dokončení této operace trvat několik minut. Nejnovější stav můžete získat dotazem na adresu URL v Location hlavičce předchozí odpovědi HTTP 202.
GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
V tomto případě je funkce stále spuštěná. Můžete zobrazit vstup uložený ve stavu orchestrátoru a čas poslední aktualizace. Hodnoty záhlaví můžete dál používat Location k dotazování na dokončení. Když je stav Dokončeno, zobrazí se hodnota odpovědi HTTP podobná následující:
HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8
{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}
Teď vidíte, že orchestrace je dokončená a přibližně kolik času trvalo dokončení. Zobrazí se také hodnota output pole, která značí, že se nahrálo přibližně 450 kB protokolů.
Další kroky
Tato ukázka ukázala, jak implementovat model fan-out/fan-in. Další ukázka ukazuje, jak implementovat vzor monitorování pomocí trvalých časovačů.