Delen via


Fan-out/fan-in scenario in Durable Functions - Voorbeeld van cloudback-up

Fan-out/fan-in verwijst naar het patroon van het gelijktijdig uitvoeren van meerdere functies en voert vervolgens een aggregatie uit op de resultaten. In dit artikel wordt een voorbeeld uitgelegd waarin Durable Functions wordt gebruikt om een fan-in-/fan-outscenario te implementeren. Het voorbeeld is een duurzame functie waarmee een back-up wordt gemaakt van alle of sommige site-inhoud van een app in Azure Storage.

Notitie

Versie 4 van het Node.js programmeermodel voor Azure Functions is algemeen beschikbaar. Het nieuwe v4-model is ontworpen voor een flexibelere en intuïtievere ervaring voor JavaScript- en TypeScript-ontwikkelaars. Meer informatie over de verschillen tussen v3 en v4 in de migratiehandleiding.

In de volgende codefragmenten geeft JavaScript (PM4) het programmeermodel V4 aan, de nieuwe ervaring.

Vereisten

Overzicht van scenario

In dit voorbeeld uploaden de functies alle bestanden onder een opgegeven map recursief naar blobopslag. Ze tellen ook het totale aantal bytes dat is geüpload.

Het is mogelijk om één functie te schrijven die voor alles zorgt. Het belangrijkste probleem dat u zou tegenkomen, is schaalbaarheid. Een uitvoering van één functie kan alleen worden uitgevoerd op één virtuele machine, zodat de doorvoer wordt beperkt door de doorvoer van die ene virtuele machine. Een ander probleem is betrouwbaarheid. Als er halverwege een fout is opgetreden of als het hele proces langer dan 5 minuten duurt, kan de back-up mislukken in een gedeeltelijk voltooide status. Deze moet vervolgens opnieuw worden opgestart.

Een krachtigere benadering is het schrijven van twee normale functies: een zou de bestanden opsommen en de bestandsnamen toevoegen aan een wachtrij, en een andere zou lezen uit de wachtrij en de bestanden uploaden naar blobopslag. Deze aanpak is beter qua doorvoer en betrouwbaarheid, maar hiervoor moet u een wachtrij inrichten en beheren. Belangrijker is dat er een aanzienlijke complexiteit wordt geïntroduceerd in termen van statusbeheer en coördinatie als u iets meer wilt doen, zoals het totale aantal geüploade bytes rapporteren.

Een Durable Functions-benadering biedt u alle genoemde voordelen met een zeer lage overhead.

De functies

In dit artikel worden de volgende functies in de voorbeeld-app uitgelegd:

  • E2_BackupSiteContent: Een orchestratorfunctie die aanroept E2_GetFileList om een lijst met bestanden te verkrijgen waarvan een back-up moet worden gemaakt, en roept E2_CopyFileToBlob vervolgens een back-up van elk bestand aan.
  • E2_GetFileList: Een activiteitsfunctie die een lijst met bestanden in een map retourneert.
  • E2_CopyFileToBlob: Een activiteitsfunctie die een back-up maakt van één bestand naar Azure Blob Storage.

E2_BackupSiteContent orchestratorfunctie

Deze orchestratorfunctie doet in wezen het volgende:

  1. Neemt een rootDirectory waarde als invoerparameter.
  2. Roept een functie aan om een recursieve lijst met bestanden onder rootDirectoryte krijgen.
  3. Maakt meerdere parallelle functie-aanroepen om elk bestand te uploaden naar Azure Blob Storage.
  4. Wacht totdat alle uploads zijn voltooid.
  5. Retourneert de som van het totale aantal bytes dat is geüpload naar Azure Blob Storage.

Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Let op de await Task.WhenAll(tasks); regel. Alle afzonderlijke aanroepen naar de E2_CopyFileToBlob functie zijn niet in afwachting, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze reeks taken Task.WhenAlldoorgeven, krijgen we een taak terug die niet wordt voltooid totdat alle kopieerbewerkingen zijn voltooid. Als u bekend bent met de TPL (Task Parallel Library) in .NET, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.

Nadat we Task.WhenAllhebben gewacht, weten we dat alle functieaanroepen zijn voltooid en waarden terug naar ons hebben geretourneerd. Elke aanroep om E2_CopyFileToBlob het aantal geüploade bytes te retourneren, dus het berekenen van het totale byteaantal is een kwestie van het samenvoegen van al deze retourwaarden.

Helper-activiteitsfuncties

De helperactiviteitsfuncties, net als bij andere voorbeelden, zijn gewoon reguliere functies die gebruikmaken van de activityTrigger triggerbinding.

E2_GetFileList activiteitsfunctie

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Notitie

U vraagt zich misschien af waarom u deze code niet alleen rechtstreeks in de orchestratorfunctie kunt plaatsen. U kunt, maar dit zou een van de fundamentele regels van orchestratorfuncties breken, wat inhoudt dat ze nooit I/O mogen uitvoeren, inclusief toegang tot het lokale bestandssysteem. Zie Codebeperkingen voor Orchestrator-functies voor meer informatie.

E2_CopyFileToBlob activiteitsfunctie

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Notitie

U moet het Microsoft.Azure.WebJobs.Extensions.Storage NuGet-pakket installeren om de voorbeeldcode uit te voeren.

De functie maakt gebruik van enkele geavanceerde functies van Azure Functions-bindingen (het gebruik van de Binder parameter), maar u hoeft zich geen zorgen te maken over deze details voor dit scenario.

De implementatie laadt het bestand vanaf de schijf en streamt de inhoud asynchroon naar een blob met dezelfde naam in de container 'back-ups'. De retourwaarde is het aantal bytes dat naar de opslag is gekopieerd, dat vervolgens door de orchestratorfunctie wordt gebruikt om de geaggregeerde som te berekenen.

Notitie

Dit is een perfect voorbeeld van het verplaatsen van I/O-bewerkingen naar een activityTrigger functie. Het werk kan niet alleen worden verdeeld over veel verschillende machines, maar u krijgt ook de voordelen van het controleren van de voortgang. Als het hostproces om welke reden dan ook wordt beëindigd, weet u welke uploads al zijn voltooid.

De voorbeeldtoepassing uitvoeren

U kunt de indeling in Windows starten door de volgende HTTP POST-aanvraag te verzenden.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

U kunt ook op een Linux-functie-app (Python wordt momenteel alleen uitgevoerd op Linux voor App Service) de indeling als volgt starten:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Notitie

De HttpStart functie die u aanroept, werkt alleen met inhoud met JSON-indeling. Daarom is de Content-Type: application/json header vereist en wordt het mappad gecodeerd als een JSON-tekenreeks. Bovendien wordt ervan uitgegaan dat er een vermelding in het host.json bestand is die het standaardvoorvoegsel api/ verwijdert uit alle URL's van HTTP-triggerfuncties. U vindt de opmaak voor deze configuratie in het host.json bestand in de voorbeelden.

Deze HTTP-aanvraag activeert de E2_BackupSiteContent orchestrator en geeft de tekenreeks D:\home\LogFiles door als een parameter. Het antwoord bevat een koppeling om de status van de back-upbewerking op te halen:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Afhankelijk van het aantal logboekbestanden in uw functie-app, kan het enkele minuten duren voordat deze bewerking is voltooid. U kunt de meest recente status ophalen door een query uit te voeren op de URL in de Location header van het vorige HTTP 202-antwoord.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

In dit geval wordt de functie nog steeds uitgevoerd. U kunt de invoer zien die is opgeslagen in de orchestratorstatus en de laatst bijgewerkte tijd. U kunt de Location headerwaarden blijven gebruiken om te peilen naar voltooiing. Wanneer de status 'Voltooid' is, ziet u een HTTP-antwoordwaarde die er ongeveer als volgt uitziet:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

U kunt nu zien dat de indeling is voltooid en ongeveer hoeveel tijd het kostte om de indeling te voltooien. U ziet ook een waarde voor het output veld, wat aangeeft dat er ongeveer 450 kB aan logboeken is geüpload.

Volgende stappen

In dit voorbeeld ziet u hoe u het fan-out/fan-in patroon implementeert. In het volgende voorbeeld ziet u hoe u het monitorpatroon implementeert met behulp van duurzame timers.