Sdílet prostřednictvím


Scénář fan-out/fan-in v Durable Functions – příklad cloudového zálohování

Ventilátory nebo ventilátory odkazují na model souběžného spouštění více funkcí a následné agregace výsledků. Tento článek vysvětluje ukázku, která používá Durable Functions k implementaci scénáře fan-in/fan-out. Ukázka je odolná funkce, která zálohuje veškerý obsah webu aplikace nebo její obsah do Služby Azure Storage.

Poznámka:

Verze 4 programovacího modelu Node.js pro Azure Functions je obecně dostupná. Nový model v4 je navržený tak, aby měl flexibilnější a intuitivnější prostředí pro vývojáře v JavaScriptu a TypeScriptu. Další informace o rozdílech mezi v3 a v4 najdete v průvodci migrací.

V následujících fragmentech kódu JavaScript (PM4) označuje programovací model V4, nové prostředí.

Požadavky

Přehled scénáře

V této ukázce funkce nahrají všechny soubory v zadaném adresáři rekurzivně do úložiště objektů blob. Počítají také celkový počet bajtů, které byly odeslány.

Je možné napsat jednu funkci, která se postará o všechno. Hlavním problémem, na který byste narazili, je škálovatelnost. Spuštění jedné funkce se dá spustit jenom na jednom virtuálním počítači, takže propustnost bude omezená propustností daného virtuálního počítače. Dalším problémem je spolehlivost. Pokud dojde k selhání uprostřed procesu nebo pokud celý proces trvá déle než 5 minut, může zálohování selhat v částečně dokončeném stavu. Pak by bylo potřeba ho restartovat.

Robustnějším přístupem by bylo zapsat dvě běžné funkce: jeden by vyčíslil soubory a přidal názvy souborů do fronty a jiný by načetl z fronty a nahrál soubory do úložiště objektů blob. Tento přístup je lepší z hlediska propustnosti a spolehlivosti, ale vyžaduje zřízení a správu fronty. Důležitější je, že významnější složitost se zavádí z hlediska správy stavu a koordinace , pokud chcete udělat něco víc, například hlásit celkový počet bajtů nahraných.

Přístup Durable Functions poskytuje všechny uvedené výhody s velmi nízkou režií.

Funkce

Tento článek vysvětluje následující funkce v ukázkové aplikaci:

funkce orchestrátoru E2_BackupSiteContent

Tato funkce orchestrátoru v podstatě dělá toto:

  1. rootDirectory Přebírá hodnotu jako vstupní parametr.
  2. Volá funkci, která získá rekurzivní seznam souborů v části rootDirectory.
  3. Provádí několik paralelních volání funkcí pro nahrání každého souboru do služby Azure Blob Storage.
  4. Čeká na dokončení všech nahrávání.
  5. Vrátí celkový součet bajtů, které byly odeslány do služby Azure Blob Storage.

Tady je kód, který implementuje funkci orchestrátoru:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

await Task.WhenAll(tasks); Všimněte si řádku. Všechna jednotlivá volání E2_CopyFileToBlob funkce nebyla očekávána, což jim umožňuje spustit paralelně. Když toto pole úkolů Task.WhenAllpředáme, získáme zpět úkol, který se nedokončí , dokud se nedokončí všechny operace kopírování. Pokud znáte knihovnu TPL (Task Parallel Library) v rozhraní .NET, není to pro vás nové. Rozdíl je v tom, že tyto úlohy můžou běžet na několika virtuálních počítačích současně a rozšíření Durable Functions zajišťuje, že je kompletní spouštění odolné vůči recyklaci procesů.

Po čekání na Task.WhenAllvolání víme, že všechna volání funkcí se dokončila a vrátila nám hodnoty zpět. Každé volání, které E2_CopyFileToBlob vrátí počet nahraných bajtů, takže výpočet celkového počtu bajtů je otázkou přidání všech vrácených hodnot dohromady.

Pomocné funkce aktivit

Pomocné funkce aktivit, stejně jako u jiných ukázek, jsou pouze běžné funkce, které používají vazbu triggeru activityTrigger .

funkce aktivity E2_GetFileList

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Poznámka:

Možná vás zajímá, proč jste tento kód nemohli jednoduše vložit přímo do funkce orchestrátoru. Mohli byste, ale to by přerušilo jedno ze základních pravidel funkcí orchestrátoru, což znamená, že by nikdy neměly provádět vstupně-výstupní operace, včetně přístupu k místnímu systému souborů. Další informace najdete v tématu Omezení kódu funkce orchestratoru.

funkce aktivity E2_CopyFileToBlob

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Poznámka:

Abyste mohli spustit vzorový kód, budete muset nainstalovat Microsoft.Azure.WebJobs.Extensions.Storage balíček NuGet.

Funkce používá některé pokročilé funkce vazeb Azure Functions (to znamená použití parametruBinder), ale nemusíte se starat o tyto podrobnosti pro účely tohoto názorného postupu.

Implementace načte soubor z disku a asynchronně streamuje obsah do objektu blob se stejným názvem v kontejneru "backups". Vrácená hodnota je počet bajtů zkopírovaných do úložiště, který pak používá funkce orchestrátoru k výpočtu agregovaného součtu.

Poznámka:

Toto je dokonalý příklad přesunu vstupně-výstupních operací do activityTrigger funkce. Nejen, že je možné práci distribuovat napříč mnoha různými počítači, ale získáte také výhody kontrolního bodu průběhu. Pokud se proces hostitele z nějakého důvodu ukončí, víte, které nahrávání se už dokončilo.

Spuštění ukázky

Orchestraci můžete spustit ve Windows odesláním následujícího požadavku HTTP POST.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Případně můžete v linuxové aplikaci funkcí (Python aktuálně běží jenom v Linuxu pro App Service), spustit orchestraci takto:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Poznámka:

Funkce HttpStart , kterou vyvoláváte, funguje jenom s obsahem ve formátu JSON. Z tohoto důvodu je hlavička Content-Type: application/json povinná a cesta k adresáři je zakódovaná jako řetězec JSON. Kromě toho fragment kódu HTTP předpokládá, že v host.json souboru existuje položka, která odebere výchozí api/ předponu ze všech adres URL funkcí triggeru HTTP. Značky pro tuto konfiguraci najdete v host.json souboru v ukázkách.

Tento požadavek HTTP aktivuje E2_BackupSiteContent orchestrátor a předá řetězec D:\home\LogFiles jako parametr. Odpověď poskytuje odkaz na získání stavu operace zálohování:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

V závislosti na tom, kolik souborů protokolu máte v aplikaci funkcí, může dokončení této operace trvat několik minut. Nejnovější stav můžete získat dotazem na adresu URL v Location hlavičce předchozí odpovědi HTTP 202.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

V tomto případě je funkce stále spuštěná. Můžete zobrazit vstup uložený ve stavu orchestrátoru a čas poslední aktualizace. Hodnoty záhlaví můžete dál používat Location k dotazování na dokončení. Když je stav Dokončeno, zobrazí se hodnota odpovědi HTTP podobná následující:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Teď vidíte, že orchestrace je dokončená a přibližně kolik času trvalo dokončení. Zobrazí se také hodnota output pole, která značí, že se nahrálo přibližně 450 kB protokolů.

Další kroky

Tato ukázka ukázala, jak implementovat model fan-out/fan-in. Další ukázka ukazuje, jak implementovat vzor monitorování pomocí trvalých časovačů.