Share via


Scenario voor uitwaaiing/inwaaiing in Durable Functions - Voorbeeld van cloudback-up

Fan-out/fan-in verwijst naar het patroon van het gelijktijdig uitvoeren van meerdere functies en het vervolgens uitvoeren van een aggregatie op de resultaten. In dit artikel wordt een voorbeeld uitgelegd waarin Durable Functions wordt gebruikt om een scenario voor in-/uitwaaiing te implementeren. Het voorbeeld is een duurzame functie die een back-up maakt van alle of een deel van de site-inhoud van een app in Azure Storage.

Notitie

Versie 4 van het Node.js programmeermodel voor Azure Functions is algemeen beschikbaar. Het nieuwe v4-model is ontworpen voor een flexibelere en intuïtievere ervaring voor JavaScript- en TypeScript-ontwikkelaars. Meer informatie over de verschillen tussen v3 en v4 vindt u in de migratiehandleiding.

In de volgende codefragmenten geeft JavaScript (PM4) het programmeermodel V4 aan, de nieuwe ervaring.

Vereisten

Overzicht van scenario's

In dit voorbeeld uploaden de functies alle bestanden onder een opgegeven map recursief naar blobopslag. Ze tellen ook het totale aantal bytes dat is geüpload.

Het is mogelijk om één functie te schrijven die alles regelt. Het belangrijkste probleem dat u zou tegenkomen, is schaalbaarheid. Een uitvoering van één functie kan slechts op één virtuele machine worden uitgevoerd, dus de doorvoer wordt beperkt door de doorvoer van die ene VM. Een ander probleem is de betrouwbaarheid. Als er halverwege een fout optreedt of als het hele proces langer dan 5 minuten duurt, kan de back-up mislukken in een gedeeltelijk voltooide status. Vervolgens moet het opnieuw worden opgestart.

Een krachtigere benadering is het schrijven van twee reguliere functies: een zou de bestanden opsommen en de bestandsnamen toevoegen aan een wachtrij, en een andere zou uit de wachtrij lezen en de bestanden uploaden naar blob-opslag. Deze benadering is beter in termen van doorvoer en betrouwbaarheid, maar vereist dat u een wachtrij inricht en beheert. Belangrijker is dat er een aanzienlijke complexiteit wordt geïntroduceerd in termen van statusbeheer en coördinatie als u meer wilt doen, zoals het rapporteren van het totale aantal geüploade bytes.

Een Durable Functions aanpak biedt u alle genoemde voordelen met een zeer lage overhead.

De functies

In dit artikel worden de volgende functies in de voorbeeld-app uitgelegd:

  • E2_BackupSiteContent: een orchestratorfunctie die aanroept E2_GetFileList om een lijst met bestanden te verkrijgen waarvan u een back-up wilt maken en die vervolgens aanroept E2_CopyFileToBlob om een back-up van elk bestand te maken.
  • E2_GetFileList: Een activiteitsfunctie die een lijst met bestanden in een map retourneert.
  • E2_CopyFileToBlob: Een activiteitsfunctie die een back-up maakt van één bestand naar Azure Blob Storage.

E2_BackupSiteContent orchestratorfunctie

Deze orchestratorfunctie doet in wezen het volgende:

  1. Neemt een rootDirectory waarde als invoerparameter.
  2. Roept een functie aan om een recursieve lijst met bestanden onder rootDirectoryop te halen.
  3. Maakt meerdere parallelle functie-aanroepen om elk bestand te uploaden naar Azure Blob Storage.
  4. Wacht totdat alle uploads zijn voltooid.
  5. Retourneert de som van het totale aantal bytes dat is geüpload naar Azure Blob Storage.

Dit is de code waarmee de orchestratorfunctie wordt geïmplementeerd:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Let op de await Task.WhenAll(tasks); regel. Alle afzonderlijke aanroepen van de E2_CopyFileToBlob functie zijn niet afgewacht, waardoor ze parallel kunnen worden uitgevoerd. Wanneer we deze matrix met taken doorgeven aan Task.WhenAll, krijgen we een taak terug die pas wordt voltooid als alle kopieerbewerkingen zijn voltooid. Als u bekend bent met de TPL (Task Parallel Library) in .NET, is dit niet nieuw voor u. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en de extensie Durable Functions zorgt ervoor dat de end-to-end-uitvoering bestand is tegen procesrecycling.

Nadat we hebben gewacht van Task.WhenAll, weten we dat alle functie-aanroepen zijn voltooid en waarden naar ons hebben geretourneerd. Elke aanroep naar E2_CopyFileToBlob retourneert het aantal geüploade bytes, dus het berekenen van het totale aantal bytes is een kwestie van het optellen van al deze retourwaarden bij elkaar.

Helper-activiteitsfuncties

De helperactiviteitsfuncties zijn, net als bij andere voorbeelden, gewoon normale functies die gebruikmaken van de activityTrigger triggerbinding.

E2_GetFileList activiteitsfunctie

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Notitie

U vraagt zich misschien af waarom u deze code niet rechtstreeks in de orchestratorfunctie kunt plaatsen. Dat zou kunnen, maar dit zou een van de fundamentele regels van orchestratorfuncties breken, namelijk dat ze nooit I/O mogen uitvoeren, inclusief toegang tot het lokale bestandssysteem. Zie Beperkingen voor Orchestrator-functiecode voor meer informatie.

E2_CopyFileToBlob activiteitsfunctie

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Notitie

U moet het Microsoft.Azure.WebJobs.Extensions.Storage NuGet-pakket installeren om de voorbeeldcode uit te voeren.

De functie maakt gebruik van enkele geavanceerde functies van Azure Functions bindingen (dat wil gezegd, het gebruik van de Binder parameter), maar u hoeft zich geen zorgen te maken over deze details voor het doel van dit scenario.

Tijdens de implementatie wordt het bestand van de schijf geladen en wordt de inhoud asynchroon gestreamd naar een blob met dezelfde naam in de container 'back-ups'. De retourwaarde is het aantal bytes dat naar de opslag wordt gekopieerd, die vervolgens door de orchestratorfunctie wordt gebruikt om de geaggregeerde som te berekenen.

Notitie

Dit is een perfect voorbeeld van het verplaatsen van I/O-bewerkingen naar een activityTrigger functie. Het werk kan niet alleen worden verdeeld over veel verschillende machines, maar u krijgt ook de voordelen van het controleren van de voortgang. Als het hostproces om welke reden dan ook wordt beëindigd, weet u welke uploads al zijn voltooid.

De voorbeeldtoepassing uitvoeren

U kunt de indeling in Windows starten door de volgende HTTP POST-aanvraag te verzenden.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

In een Linux-functie-app (Python wordt momenteel alleen uitgevoerd op Linux voor App Service), kunt u de indeling als volgt starten:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Notitie

De HttpStart functie die u aanroept, werkt alleen met inhoud in JSON-indeling. Daarom is de Content-Type: application/json header vereist en wordt het mappad gecodeerd als een JSON-tekenreeks. Bovendien wordt in het HTTP-fragment ervan uitgegaan dat er een vermelding in het host.json bestand is die het standaardvoorvoegsel api/ verwijdert uit alle URL's voor HTTP-triggerfuncties. U vindt de markeringen voor deze configuratie in het host.json bestand in de voorbeelden.

Deze HTTP-aanvraag activeert de E2_BackupSiteContent orchestrator en geeft de tekenreeks D:\home\LogFiles door als een parameter. Het antwoord bevat een koppeling om de status van de back-upbewerking op te halen:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Afhankelijk van het aantal logboekbestanden dat uw functie-app bevat, kan deze bewerking enkele minuten duren. U kunt de meest recente status ophalen door een query uit te voeren op de URL in de Location header van het vorige HTTP 202-antwoord.

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

In dit geval wordt de functie nog steeds uitgevoerd. U kunt de invoer zien die is opgeslagen in de orchestratorstatus en het tijdstip van de laatste update. U kunt de Location headerwaarden blijven gebruiken om te peilen naar voltooiing. Wanneer de status 'Voltooid' is, ziet u een HTTP-antwoordwaarde die er ongeveer als volgt uitziet:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

U kunt nu zien dat de indeling is voltooid en hoeveel tijd het ongeveer kostte om te voltooien. U ziet ook een waarde voor het output veld, wat aangeeft dat er ongeveer 450 kB aan logboeken is geüpload.

Volgende stappen

In dit voorbeeld is getoond hoe u het patroon voor uitwaaiers/inwaaiers implementeert. In het volgende voorbeeld ziet u hoe u het monitorpatroon implementeert met behulp van duurzame timers.