Delen via


Fan-out/fan-in situatie

Fan-out/fan-in voert meerdere functies parallel uit en voegt vervolgens de resultaten samen. In dit artikel wordt een voorbeeld weergegeven waarin Durable Functions wordt gebruikt om een back-up te maken van bepaalde of alle site-inhoud van een app naar Azure Storage.

Vereiste voorwaarden

Fan-out/fan-in voert meerdere activiteiten parallel uit en voegt vervolgens de resultaten samen. In dit artikel wordt beschreven hoe u het patroon implementeert met behulp van de Durable Task SDK's voor .NET, JavaScript, Python en Java.

Overzicht van scenario

In dit voorbeeld uploaden de functies alle bestanden onder een opgegeven map (recursief) naar blobopslag. Ze tellen ook het totale aantal geüploade bytes.

Eén functie kan alles verwerken, maar niet schalen. Een uitvoering van één functie wordt uitgevoerd op één virtuele machine (VM), dus de doorvoer is beperkt tot die VM. Betrouwbaarheid is een andere zorg. Als het proces halverwege mislukt of langer dan vijf minuten duurt, kan de back-up een gedeeltelijk voltooide status hebben. Vervolgens start u de back-up opnieuw op.

Een krachtigere benadering is het gebruik van twee afzonderlijke functies: een opsomming van de bestanden en voegt bestandsnamen toe aan een wachtrij en de andere leest uit de wachtrij en uploadt de bestanden naar blob-opslag. Deze aanpak verbetert de doorvoer en betrouwbaarheid, maar u moet de wachtrij instellen en beheren. Belangrijker is dat deze aanpak complexiteit toevoegt voor statusbeheer en coördinatie, zoals het rapporteren van het totale aantal geüploade bytes.

Durable Functions biedt al deze voordelen met weinig overhead.

In het volgende voorbeeld verwerkt de orchestrator meerdere werkitems parallel en worden de resultaten vervolgens samengevoegd. Dit patroon is handig wanneer u het volgende moet doen:

  • Een batch met items verwerken waarbij elk item onafhankelijk kan worden verwerkt
  • Werk verdelen over meerdere computers voor betere doorvoer
  • Resultaten van alle parallelle bewerkingen aggregeren

Zonder het fan-out/fan-in patroon verwerkt u items sequentieel, waardoor de doorvoer wordt beperkt of u uw eigen wachtrij- en coördinatielogica beheert, wat complexiteit toevoegt.

De DURABLE Task SDK's verwerken parallellisatie en coördinatie, dus het patroon is eenvoudig te implementeren.

De functies

In dit artikel worden de functies in de voorbeeld-app beschreven:

  • E2_BackupSiteContent: Een orchestratorfunctie die E2_GetFileList aanroept om een lijst met bestanden op te halen waarvan een back-up moet worden gemaakt, en die vervolgens E2_CopyFileToBlob aanroept voor elk bestand.
  • E2_GetFileList: Een activiteitsfunctie die een lijst met bestanden in een map retourneert.
  • E2_CopyFileToBlob: een activiteitsfunctie die een back-up maakt van één bestand naar Azure Blob Storage.

In dit artikel worden de onderdelen in de voorbeeldcode beschreven:

  • ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator of FanOutFanIn_WordCount: Een orchestrator die activiteiten parallel uitwaaiert, wacht totdat alle activiteiten zijn voltooid, en vervolgens inwaaiert door de resultaten te aggregeren.
  • ProcessWorkItemActivity, processWorkItem, process_work_itemof CountWords: een activiteit die één werkitem verwerkt.
  • AggregateResultsActivity, aggregateResultsof aggregate_results: een activiteit die resultaten van alle parallelle bewerkingen samenvoegt.

Orchestrator

Deze orchestratorfunctie doet het volgende:

  1. Neemt rootDirectory als invoer.
  2. Roept een functie aan om een recursieve lijst met bestanden onder rootDirectoryte krijgen.
  3. Hiermee worden parallelle functie-aanroepen uitgevoerd om elk bestand te uploaden naar Azure Blob Storage.
  4. Wacht totdat alle uploads zijn voltooid.
  5. Retourneert het totale aantal geüploade bytes naar Azure Blob Storage.

Hier volgt de code waarmee de orchestratorfunctie wordt geïmplementeerd:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Let op de await Task.WhenAll(tasks); regel. De code wacht niet op de afzonderlijke aanroepen, E2_CopyFileToBlobzodat ze parallel worden uitgevoerd. Wanneer de orchestrator de takenlijst naar Task.WhenAll doorgeeft, wordt er een taak geretourneerd die pas wordt voltooid als alle kopieerbewerkingen zijn afgerond. Als u bekend bent met de TPL (Task Parallel Library) in .NET, is dit patroon bekend. Het verschil is dat deze taken gelijktijdig op meerdere virtuele machines kunnen worden uitgevoerd en dat de Durable Functions-extensie ervoor zorgt dat de end-to-end-uitvoering bestand is tegen het verwerken van recycling.

Nadat de orchestrator wacht Task.WhenAll, zijn alle functieaanroepen voltooid en worden de waarden geretourneerd. Elke aanroep voor E2_CopyFileToBlob geeft het aantal geüploade bytes terug. Bereken het totaal door de retourwaarden toe te voegen.

De orchestrator doet het volgende:

  1. Hiermee wordt een lijst met werkitems als invoer gebruikt.
  2. Verspreidt zich door voor elk werkitem een taak te maken en ze parallel te verwerken.
  3. Wacht totdat alle parallelle taken zijn voltooid.
  4. Fans binnenhalen door de resultaten te aggregeren.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

Gebruik Task.WhenAll() dit om te wachten tot alle parallelle taken zijn voltooid. De Durable Task SDK zorgt ervoor dat de taken gelijktijdig op meerdere computers kunnen worden uitgevoerd en dat de uitvoering bestand is tegen het opnieuw opstarten van processen.

Activiteiten

De helperactiviteitsfuncties zijn reguliere functies die gebruikmaken van de activityTrigger binding.

E2_GetFileList activiteitsfunctie

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Opmerking

Plaats deze code niet in de orchestratorfunctie. Orchestrator-functies mogen geen I/O uitvoeren, inclusief toegang tot het lokale bestandssysteem. Zie Codebeperkingen voor Orchestrator-functies voor meer informatie.

E2_CopyFileToBlob activiteitsfunctie

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Opmerking

Als u de voorbeeldcode wilt uitvoeren, installeert u het NuGet-pakket Microsoft.Azure.WebJobs.Extensions.Storage.

De functie gebruikt Azure Functions bindingsfuncties zoals de parameter Binder. U hebt deze details niet nodig voor deze stap-voor-stap instructies.

De implementatie laadt het bestand vanaf de schijf en streamt de inhoud asynchroon naar een blob met dezelfde naam in de backups container. De functie retourneert het aantal bytes dat naar de opslag is gekopieerd. De orchestrator gebruikt die waarde om de geaggregeerde som te berekenen.

Opmerking

In dit voorbeeld worden I/O-bewerkingen verplaatst naar een activityTrigger functie. Het werk kan worden uitgevoerd op meerdere computers en ondersteunt voortgangscontrolepunten. Als het hostproces eindigt, weet u welke uploads zijn voltooid.

Activiteiten doen het werk. In tegenstelling tot orchestrators kunnen activiteiten I/O-bewerkingen en niet-deterministische logica uitvoeren.

Verwerk werkitemactiviteit

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (this is where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

Activiteit van geaggregeerde resultaten

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

Het voorbeeld uitvoeren

Start de orchestratie op Windows door de volgende HTTP POST-verzoek te verzenden.

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

U kunt ook in een Linux-functie-app de indeling starten door de volgende HTTP POST-aanvraag te verzenden. Python wordt momenteel uitgevoerd op Linux voor App Service:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Opmerking

De HttpStart functie verwacht JSON. Neem de Content-Type: application/json header op en codeer het mappad als een JSON-tekenreeks. In het HTTP-fragment wordt ervan uitgegaan dathost.json een vermelding heeft waarmee het standaardvoorvoegsel api/ wordt verwijderd uit alle URL's van de HTTP-triggerfunctie. Zoek de opmaak voor deze configuratie in het voorbeeldbestandhost.json .

Deze HTTP-aanvraag activeert de E2_BackupSiteContent orchestrator en geeft de tekenreeks D:\home\LogFiles door als een parameter. Het antwoord heeft een koppeling om de status van de back-upbewerking te controleren:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Afhankelijk van het aantal logboekbestanden in uw functie-app kan het enkele minuten duren voordat deze bewerking is voltooid. Haal de meest recente status op door een query uit te voeren op de URL in de Location header van het vorige HTTP 202-antwoord:

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

In dit geval wordt de functie nog steeds uitgevoerd. Het antwoord toont de invoer die is opgeslagen in de orchestratorstatus en de laatst bijgewerkte tijd. Gebruik de Location headerwaarde om de voltooiing te controleren. Wanneer de status 'Voltooid' is, lijkt het antwoord op het volgende voorbeeld:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Het antwoord laat zien dat de orkestratie is voltooid en de geschatte tijd om te voltooien. Het output veld geeft aan dat de indeling ongeveer 450 kB aan logboeken heeft geüpload.

Ga als volgt te werk om het voorbeeld uit te voeren:

  1. Start de Durable Task Scheduler-emulator voor lokale ontwikkeling.

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Start de worker om de orkestrator en activiteiten te registreren.

  3. Voer de client uit om een orkestratie in te plannen met een werkitemlijst:

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

Volgende stappen 

In dit voorbeeld ziet u het fan-out/fan-in patroon. In het volgende voorbeeld ziet u hoe u het monitorpatroon implementeert met duurzame timers.

In dit artikel wordt het patroon fan-out/fan-in gedemonstreert. Verken meer patronen en functies:

Zie de Durable Task JavaScript SDK-voorbeelden voor JavaScript SDK-voorbeelden.