Freigeben über


Fan-out/Fan-In-Szenario

Fan-out/Fan-In führt mehrere Aktivitäten parallel aus und aggregiert dann die Ergebnisse. In diesem Artikel wird gezeigt, wie Sie das Muster mithilfe der SdKs für dauerhafte Aufgaben für .NET, JavaScript, Python und Java implementieren.

Beschreibung des Szenarios

In diesem Beispiel laden die Funktionen alle Dateien unter einem angegebenen Verzeichnis (rekursiv) in blob Storage hoch. Sie zählen auch die Gesamtanzahl der hochgeladenen Bytes.

Eine einzelne Funktion kann alles verarbeiten, aber sie skaliert nicht. Eine einzelne Funktionsausführung wird auf einem virtuellen Computer (VM) ausgeführt, sodass der Durchsatz auf diese VM beschränkt ist. Zuverlässigkeit ist ein weiterer Aspekt. Wenn der Prozess in der Mitte fehlschlägt oder mehr als fünf Minuten dauert, kann die Sicherung in einem teilweise abgeschlossenem Zustand enden. Anschließend starten Sie die Sicherung neu.

Ein robusterer Ansatz besteht darin, zwei separate Funktionen zu verwenden: eine listet die Dateien auf und fügt einer Warteschlange Dateinamen hinzu, und die anderen liest aus der Warteschlange und lädt die Dateien in BLOB-Speicher hoch. Dieser Ansatz verbessert den Durchsatz und die Zuverlässigkeit, aber Sie müssen die Warteschlange einrichten und verwalten. Entscheidender ist, dass dieser Ansatz die Komplexität bei der Verwaltung und Koordination des Systemzustands erhöht, wie beispielsweise bei der Berichterstattung über die Gesamtanzahl der hochgeladenen Bytes.

Durable Functions bietet all diese Vorteile mit geringem Aufwand.

Im folgenden Beispiel verarbeitet der Orchestrator mehrere Arbeitsaufgaben parallel und aggregiert dann die Ergebnisse. Dieses Muster ist nützlich, wenn Sie Folgendes benötigen:

  • Verarbeiten einer Reihe von Elementen, bei denen jedes Element unabhängig voneinander verarbeitet werden kann
  • Verteilen von Arbeit auf mehreren Computern für einen besseren Durchsatz
  • Aggregierte Ergebnisse aus allen parallelen Vorgängen

Ohne das Lüfter-/Fan-In-Muster verarbeiten Sie elemente entweder sequenziell, wodurch der Durchsatz begrenzt wird, oder Sie verwalten Ihre eigene Warteschlangen- und Koordinationslogik, wodurch Komplexität hinzugefügt wird.

Die SDKs für dauerhafte Aufgaben behandeln Parallelisierung und Koordination, sodass das Muster einfach zu implementieren ist.

Die Funktionen

In diesem Artikel werden die Funktionen in der Beispiel-App beschrieben:

  • E2_BackupSiteContent: Eine Orchestratorfunktion , die aufruft E2_GetFileList , um eine Liste der zu sichernden Dateien abzurufen, und dann für jede Datei aufruft E2_CopyFileToBlob .
  • E2_GetFileList: Eine Aktivitäts Funktion, die eine Liste der Dateien in einem Verzeichnis zurückgibt.
  • E2_CopyFileToBlob: Eine Aktivitätsfunktion, die eine einzelne Datei in den Azure-Blob-Speicher sichert.

In diesem Artikel werden die Komponenten im Beispielcode beschrieben:

  • ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator oder FanOutFanIn_WordCount: Ein Orchestrator, der die Arbeit an mehreren Aktivitäten parallel ausführt, wartet, bis alle Aktivitäten abgeschlossen sind, und konsolidiert dann, indem die Ergebnisse aggregiert werden.
  • ProcessWorkItemActivity, processWorkItem, oder process_work_itemCountWords: Eine Aktivität, die eine einzelne Arbeitsaufgabe verarbeitet.
  • AggregateResultsActivity, aggregateResultsoder aggregate_results: Eine Aktivität, die Ergebnisse aus allen parallelen Vorgängen aggregiert.

Orchestrator

Diese Orchestratorfunktion führt folgende Aktionen aus:

  1. Nimmt rootDirectory als Eingabe an.
  2. Sie ruft eine Funktion auf, um unter rootDirectory eine rekursive Liste mit Dateien abzurufen.
  3. Führt parallele Funktionsaufrufe aus, um jede Datei in Azure Blob Storage hochzuladen.
  4. Sie wartet, bis alle Uploads abgeschlossen wurden.
  5. Gibt die Gesamtanzahl der in Azure Blob Storage hochgeladenen Bytes zurück.

Im Folgenden wird der Code dargestellt, der die Orchestratorfunktion implementiert:

[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

Beachten Sie die Zeile await Task.WhenAll(tasks);. Der Code wartet nicht auf die einzelnen Aufrufe E2_CopyFileToBlob, sodass sie parallel ausgeführt werden. Wenn der Orchestrator das Aufgabenarray an Task.WhenAll übergibt, wird eine Aufgabe zurückgegeben, die erst dann abgeschlossen wird, wenn alle Kopiervorgänge abgeschlossen sind. Wenn Sie mit der Task Parallel Library (TPL) in .NET vertraut sind, ist dieses Muster vertraut. Der Unterschied besteht darin, dass diese Aufgaben gleichzeitig auf mehreren virtuellen Computern ausgeführt werden können, und die Durable Functions Erweiterung stellt sicher, dass die End-to-End-Ausführung widerstandsfähig für die Prozesswiederverwendung ist.

Nachdem der Orchestrator abgewartet hat Task.WhenAll, sind alle Funktionsaufrufe abgeschlossen und geben Werte zurück. Bei jedem Aufruf von E2_CopyFileToBlob wird die Anzahl der hochgeladenen Bytes zurückgegeben. Berechnen Sie die Summe, indem Sie die Rückgabewerte hinzufügen.

Der Orchestrator führt die folgenden Aktionen aus:

  1. Verwendet eine Liste der Arbeitsaufgaben als Eingabe.
  2. Fans out, indem sie eine Aufgabe für jede Arbeitsaufgabe erstellen und parallel verarbeiten.
  3. Wartet auf den Abschluss aller parallelen Aufgaben.
  4. Fans durch Aggregieren der Ergebnisse.
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

Verwenden Sie Task.WhenAll(), um auf den Abschluss aller parallelen Aufgaben zu warten. Das Durable Task SDK stellt sicher, dass die Aufgaben gleichzeitig auf mehreren Computern ausgeführt werden können, und die Ausführung ist robust für Neustarts.

Aktivitäten

Die Hilfsaktivitätsfunktionen sind normale Funktionen, die die activityTrigger Bindung verwenden.

Aktivitätsfunktion „E2_GetFileList“

[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

Hinweis

Fügen Sie diesen Code nicht in die Orchestratorfunktion ein. Orchestrator-Funktionen sollten keine E/A ausführen, einschließlich des lokalen Dateisystemzugriffs. Weitere Informationen finden Sie unter Codeeinschränkungen für Orchestratorfunktionen.

Aktivitätsfunktion „E2_CopyFileToBlob“

[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

Hinweis

Um den Beispielcode auszuführen, installieren Sie das Microsoft.Azure.WebJobs.Extensions.Storage NuGet-Paket.

Die Funktion verwendet Azure Functions Bindungsfeatures wie den Binder Parameter. Für diese Anleitung benötigen Sie solche Details nicht.

Die Implementierung lädt die Datei vom Datenträger und streamt den Inhalt asynchron in einen Blob mit demselben Namen im backups Container. Die Funktion gibt die Anzahl der in den Speicher kopierten Bytes zurück. Der Orchestrator verwendet diesen Wert, um die Aggregatsumme zu berechnen.

Hinweis

In diesem Beispiel werden E/A-Vorgänge in eine activityTrigger Funktion verschoben. Die Arbeit kann auf mehreren Computern ausgeführt werden und unterstützt die Fortschrittsüberprüfung. Wenn der Hostprozess endet, wissen Sie, welche Uploads abgeschlossen sind.

Aktivitäten erledigen die Arbeit. Im Gegensatz zu Orchestratoren können Aktivitäten E/A-Vorgänge und nicht deterministische Logik ausführen.

Arbeitsaufgabe verarbeiten

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (this is where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

Aggregierte Ergebnisaktivität

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

Beispiel ausführen

Starten Sie die Orchestrierung auf Windows, indem Sie die folgende HTTP POST-Anforderung senden:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

Alternativ können Sie in einer Linux-Funktions-App die Orchestrierung starten, indem Sie die folgende HTTP POST-Anforderung senden. Python wird derzeit unter Linux für App Service ausgeführt:

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

Hinweis

Die HttpStart Funktion erwartet JSON. Fügen Sie den Content-Type: application/json Header ein, und codieren Sie den Verzeichnispfad als JSON-Zeichenfolge. Der HTTP-Codeausschnitt geht davon aus, dasshost.json einen Eintrag enthält, der das Standardpräfix api/ aus allen HTTP-Triggerfunktions-URLs entfernt. Suchen Sie das Markup für diese Konfiguration im Beispiel host.json Datei.

Diese HTTP-Anforderung löst den Orchestrator E2_BackupSiteContent aus und übergibt die Zeichenfolge D:\home\LogFiles als Parameter. Die Antwort verfügt über einen Link, um den Status des Sicherungsvorgangs zu überprüfen:

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

Je nach Anzahl der Protokolldateien in Ihrer Funktions-App kann dieser Vorgang mehrere Minuten dauern. Rufen Sie den neuesten Status ab, indem Sie die URL im Location Header der vorherigen HTTP 202-Antwort abfragen:

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

In diesem Fall wird die Funktion weiterhin ausgeführt. Die Antwort zeigt die im Orchestratorzustand gespeicherte Eingabe und die uhrzeit der letzten Aktualisierung an. Verwenden Sie den Location Kopfzeilenwert, um den Abschluss abzufragen. Wenn der Status "Abgeschlossen" ist, ähnelt die Antwort dem folgenden Beispiel:

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

Die Antwort zeigt, dass die Orchestrierung abgeschlossen ist und die ungefähre Endzeit. Das output Feld gibt an, dass die Orchestrierung ca. 450 KB Protokolle hochgeladen hat.

So führen Sie das Beispiel aus:

  1. Starten Sie den Emulator "Durable Task Scheduler " für die lokale Entwicklung.

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. Starten Sie den Worker , um den Orchestrator und die Aktivitäten zu registrieren.

  3. Führen Sie den Client aus, um eine Orchestrierung mit einer Liste von Arbeitsaufgaben zu planen:

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

Nächste Schritte

Dieses Beispiel zeigt das Lüfter-Out-/Fan-In-Muster. Das nächste Beispiel zeigt, wie das Monitormuster mit dauerhaften Timern implementiert wird.

In diesem Artikel wird das Fan-Out/Fan-In-Muster veranschaulicht. Entdecken Sie weitere Muster und Features:

Beispiele für javaScript SDK finden Sie in den Beispielen für das Durable Task JavaScript SDK.