次の方法で共有


ファンアウト/ファンイン シナリオ

ファンアウト/ファンインでは 、複数のアクティビティが並列で実行され、結果が集計されます。 この記事では、.NET、JavaScript、Python、Java用の Durable Task SDK を使用してパターンを実装する方法について説明します。

シナリオの概要

このサンプルでは、関数は指定されたディレクトリのすべてのファイルを BLOB ストレージに (再帰的に) アップロードします。 また、アップロードされた合計バイト数もカウントされます。

1 つの関数ですべてを処理できますが、スケーリングは行われません。 1 つの関数の実行は 1 つの仮想マシン (VM) で実行されるため、スループットはその VM に制限されます。 信頼性はもう 1 つの懸念事項です。 プロセスが途中で失敗した場合、または 5 分を超える時間がかかる場合、バックアップは部分的に完了した状態で終了する可能性があります。 その後、バックアップを再起動します。

より堅牢な方法は、2 つの異なる関数を使用することです。1 つはファイルを列挙してキューにファイル名を追加し、もう 1 つはキューから読み取り、ファイルを BLOB ストレージにアップロードすることです。 この方法ではスループットと信頼性が向上しますが、キューを設定して管理する必要があります。 さらに重要なのは、この方法では、アップロードされた合計バイト数の報告など、状態管理と調整の複雑さが増します。

Durable Functionsは、これらすべての利点をほとんどオーバーヘッドなく提供します。

次の例では、オーケストレーターは複数の作業項目を並行して処理し、結果を集計します。 このパターンは、次の操作を行う必要がある場合に便利です。

  • 各項目を個別に処理できる項目のバッチを処理する
  • スループットを向上させるために複数のマシンに作業を分散する
  • すべての並列操作の結果を集計する

ファンアウト/ファンイン パターンがない場合は、スループットを制限する項目を順番に処理するか、独自のキューと調整ロジックを管理して複雑さを増します。

Durable Task SDK は並列化と調整を処理するため、パターンは簡単に実装できます。

関数

この記事では、サンプル アプリの関数について説明します。

  • E2_BackupSiteContent: を呼び出してバックアップするファイルの一覧を取得し、各ファイルのE2_GetFileListを呼び出すE2_CopyFileToBlob
  • E2_GetFileList:ディレクトリ内のファイルのリストを返すアクティビティ関数
  • E2_CopyFileToBlob: 1 つのファイルをAzure Blob Storageにバックアップするアクティビティ関数。

この記事では、コード例のコンポーネントについて説明します。

  • ParallelProcessingOrchestration, fanOutFanInOrchestrator, fan_out_fan_in_orchestrator, または FanOutFanIn_WordCount: 複数のアクティビティに対して同時に作業を行い、すべてのアクティビティが完了するまで待機し、結果を集計するオーケストレーター。
  • ProcessWorkItemActivityprocessWorkItemprocess_work_item、または CountWords: 1 つの作業項目を処理するアクティビティ。
  • AggregateResultsActivityaggregateResults、または aggregate_results: すべての並列操作の結果を集計するアクティビティ。

Orchestrator

このオーケストレーター関数は、次の処理を行います。

  1. rootDirectoryを入力として受け取ります。
  2. rootDirectory の下のファイルの再帰リストを取得する関数を呼び出します。
  3. 各ファイルをAzure Blob Storageにアップロードする並列関数呼び出しを行います。
  4. すべてのアップロードが完了するまで待機します。
  5. Azure Blob Storageにアップロードされた合計バイト数を返します。

オーケストレーター関数を実装するコードを次に示します。

分離モデル
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_BackupSiteContent")]
    public static async Task<long> Run(
        [OrchestrationTrigger] TaskOrchestrationContext context)
    {
        string rootDirectory = context.GetInput<string>()?.Trim();
        if (string.IsNullOrEmpty(rootDirectory))
        {
            rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location)!.FullName;
        }

        string[] files = await context.CallActivityAsync<string[]>("E2_GetFileList", rootDirectory);

        Task<long>[] tasks = files
            .Select(file => context.CallActivityAsync<long>("E2_CopyFileToBlob", file))
            .ToArray();

        long[] results = await Task.WhenAll(tasks);
        return results.Sum();
    }
}

await Task.WhenAll(tasks); 行に注目してください。 コードは、 E2_CopyFileToBlobへの個々の呼び出しを待機しないため、並列で実行されます。 オーケストレーターは、タスク配列を Task.WhenAllに渡すと、すべてのコピー操作が完了するまで完了しないタスクを返します。 .NETのタスク並列ライブラリ (TPL) に慣れている場合、このパターンは使い慣れたパターンです。 違いは、これらのタスクが複数の仮想マシンで同時に実行される可能性があり、Durable Functions拡張機能により、エンドツーエンドの実行がプロセスのリサイクルに対する回復性を確保できることです。

オーケストレーターが Task.WhenAllを待機すると、すべての関数呼び出しが完了し、値が返されます。 E2_CopyFileToBlobを呼び出すたびに、アップロードされたバイト数が返されます。 戻り値を追加して合計を計算します。


プロセス内モデル
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run(
    [OrchestrationTrigger] IDurableOrchestrationContext backupContext)
{
    string rootDirectory = backupContext.GetInput<string>()?.Trim();
    if (string.IsNullOrEmpty(rootDirectory))
    {
        rootDirectory = Directory.GetParent(typeof(BackupSiteContent).Assembly.Location).FullName;
    }

    string[] files = await backupContext.CallActivityAsync<string[]>(
        "E2_GetFileList",
        rootDirectory);

    var tasks = new Task<long>[files.Length];
    for (int i = 0; i < files.Length; i++)
    {
        tasks[i] = backupContext.CallActivityAsync<long>(
            "E2_CopyFileToBlob",
            files[i]);
    }

    await Task.WhenAll(tasks);

    long totalBytes = tasks.Sum(t => t.Result);
    return totalBytes;
}

インプロセス モデルのサンプルでは、非推奨のインプロセス パッケージが使用されています。 上記のコードは、推奨される .NET 分離ワーカー モデルを示しています。


オーケストレーターは次の処理を行います。

  1. 作業項目の一覧を入力として受け取ります。
  2. 作業項目ごとにタスクを作成し、それらを並列処理する方法でファンアウトします。
  3. すべての並列タスクが完了するまで待機します。
  4. 結果を集計してファンインします。
using Microsoft.DurableTask;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ParallelProcessingOrchestration : TaskOrchestrator<List<string>, Dictionary<string, int>>
{
    public override async Task<Dictionary<string, int>> RunAsync(
        TaskOrchestrationContext context, List<string> workItems)
    {
        // Step 1: Fan-out by creating a task for each work item in parallel
        var processingTasks = new List<Task<Dictionary<string, int>>>();

        foreach (string workItem in workItems)
        {
            // Create a task for each work item (fan-out)
            Task<Dictionary<string, int>> task = context.CallActivityAsync<Dictionary<string, int>>(
                nameof(ProcessWorkItemActivity), workItem);
            processingTasks.Add(task);
        }

        // Step 2: Wait for all parallel tasks to complete
        Dictionary<string, int>[] results = await Task.WhenAll(processingTasks);

        // Step 3: Fan-in by aggregating all results
        Dictionary<string, int> aggregatedResults = await context.CallActivityAsync<Dictionary<string, int>>(
            nameof(AggregateResultsActivity), results);

        return aggregatedResults;
    }
}

Task.WhenAll()を使用して、すべての並列タスクが完了するまで待機します。 Durable Task SDK を使用すると、タスクを複数のマシンで同時に実行でき、実行は再起動を処理する回復性が確保されます。

アクティビティ

ヘルパー アクティビティ関数は、 activityTrigger バインディングを使用する通常の関数です。

E2_GetFileList アクティビティ関数

分離モデル
using System.IO;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_GetFileList")]
    public static string[] GetFileList(
        [ActivityTrigger] string rootDirectory,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger("E2_GetFileList");
        logger.LogInformation("Searching for files under '{RootDirectory}'...", rootDirectory);

        string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
        logger.LogInformation("Found {FileCount} file(s) under {RootDirectory}.", files.Length, rootDirectory);

        return files;
    }
}

プロセス内モデル
[FunctionName("E2_GetFileList")]
public static string[] GetFileList(
    [ActivityTrigger] string rootDirectory, 
    ILogger log)
{
    log.LogInformation($"Searching for files under '{rootDirectory}'...");
    string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
    log.LogInformation($"Found {files.Length} file(s) under {rootDirectory}.");

    return files;
}

オーケストレーター関数にこのコードを配置しないでください。 オーケストレーター関数では、ローカル ファイル システム アクセスを含め、I/O を実行しないでください。 詳細については、「オーケストレーター関数コードの制約」を参照してください。

E2_CopyFileToBlob アクティビティ関数

分離モデル

サンプル コードを実行するには、Azure.Storage.Blobs NuGet パッケージをインストールします。

using System;
using System.IO;
using System.Threading.Tasks;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp;

public static class BackupSiteContent
{
    [Function("E2_CopyFileToBlob")]
    public static async Task<long> CopyFileToBlob(
        [ActivityTrigger] string filePath,
        FunctionContext executionContext)
    {
        ILogger logger = executionContext.GetLogger("E2_CopyFileToBlob");
        long byteCount = new FileInfo(filePath).Length;

        string blobPath = filePath
            .Substring(Path.GetPathRoot(filePath)!.Length)
            .Replace('\\', '/');
        string outputLocation = $"backups/{blobPath}";

        string? connectionString = Environment.GetEnvironmentVariable("AzureWebJobsStorage");
        if (string.IsNullOrEmpty(connectionString))
        {
            throw new InvalidOperationException("AzureWebJobsStorage is not configured.");
        }

        BlobContainerClient containerClient = new(connectionString, "backups");
        await containerClient.CreateIfNotExistsAsync();
        BlobClient blobClient = containerClient.GetBlobClient(blobPath);

        logger.LogInformation("Copying '{FilePath}' to '{OutputLocation}'. Total bytes = {ByteCount}.", filePath, outputLocation, byteCount);

        await using Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read);
        await blobClient.UploadAsync(source, overwrite: true);

        return byteCount;
    }
}

プロセス内モデル
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob(
    [ActivityTrigger] string filePath,
    Binder binder,
    ILogger log)
{
    long byteCount = new FileInfo(filePath).Length;

    // strip the drive letter prefix and convert to forward slashes
    string blobPath = filePath
        .Substring(Path.GetPathRoot(filePath).Length)
        .Replace('\\', '/');
    string outputLocation = $"backups/{blobPath}";

    log.LogInformation($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");

    // copy the file contents into a blob
    using (Stream source = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.Read))
    using (Stream destination = await binder.BindAsync<CloudBlobStream>(
        new BlobAttribute(outputLocation, FileAccess.Write)))
    {
        await source.CopyToAsync(destination);
    }

    return byteCount;
}

インプロセス モデル サンプルでは、 Microsoft.Azure.WebJobs.Extensions.Storage NuGet パッケージが必要であり、 Binder パラメーターなどの Azure Functions バインド機能を使用します。


実装では、ディスクからファイルを読み込み、 backups コンテナー内の同じ名前の BLOB にコンテンツを非同期的にストリーミングします。 この関数は、ストレージにコピーされたバイト数を返します。 オーケストレーターはその値を使用して集計合計を計算します。

次の使用例は、I/O 操作を activityTrigger 関数に移動します。 この作業は複数のマシンで実行でき、進行状況のチェックポイント処理をサポートします。 ホスト プロセスが終了すると、どのアップロードが完了しているかがわかります。

アクティビティが作業を行います。 オーケストレーターとは異なり、アクティビティは I/O 操作と非決定的ロジックを実行できます。

作業項目アクティビティの処理

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class ProcessWorkItemActivity : TaskActivity<string, Dictionary<string, int>>
{
    private readonly ILogger<ProcessWorkItemActivity> _logger;

    public ProcessWorkItemActivity(ILogger<ProcessWorkItemActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(TaskActivityContext context, string workItem)
    {
        _logger.LogInformation("Processing work item: {WorkItem}", workItem);

        // Process the work item (this is where you do the actual work)
        var result = new Dictionary<string, int>
        {
            { workItem, workItem.Length }
        };

        return Task.FromResult(result);
    }
}

集計結果アクティビティ

using Microsoft.DurableTask;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

[DurableTask]
public class AggregateResultsActivity : TaskActivity<Dictionary<string, int>[], Dictionary<string, int>>
{
    private readonly ILogger<AggregateResultsActivity> _logger;

    public AggregateResultsActivity(ILogger<AggregateResultsActivity> logger)
    {
        _logger = logger;
    }

    public override Task<Dictionary<string, int>> RunAsync(
        TaskActivityContext context, Dictionary<string, int>[] results)
    {
        _logger.LogInformation("Aggregating {Count} results", results.Length);

        // Combine all results into one aggregated result
        var aggregatedResult = new Dictionary<string, int>();

        foreach (var result in results)
        {
            foreach (var kvp in result)
            {
                aggregatedResult[kvp.Key] = kvp.Value;
            }
        }

        return Task.FromResult(aggregatedResult);
    }
}

サンプルを実行する

次の HTTP POST 要求を送信して、Windowsでオーケストレーションを開始します。

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"D:\\home\\LogFiles"

または、Linux 関数アプリで、次の HTTP POST 要求を送信してオーケストレーションを開始します。 現在、Pythonは Linux for App Service で実行されています。

POST http://{host}/orchestrators/E2_BackupSiteContent
Content-Type: application/json
Content-Length: 20

"/home/site/wwwroot"

HttpStart関数には JSON が必要です。 Content-Type: application/json ヘッダーを含め、ディレクトリ パスを JSON 文字列としてエンコードします。 HTTP スニペットは 、host.json に、すべての HTTP トリガー関数 URL から既定の api/ プレフィックスを削除するエントリがあることを前提としています。 サンプルのhost.jsonファイルで、この構成のマークアップ 見つけます。

この HTTP 要求で E2_BackupSiteContent オーケストレーターがトリガーされ、文字列 D:\home\LogFiles がパラメーターとして渡されます。 応答には、バックアップ操作の状態を確認するためのリンクがあります。

HTTP/1.1 202 Accepted
Content-Length: 719
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

(...trimmed...)

関数アプリのログ ファイルの数によっては、この操作が完了するまでに数分かかることがあります。 前の HTTP 202 応答の Location ヘッダーの URL に対してクエリを実行して、最新の状態を取得します。

GET http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}
HTTP/1.1 202 Accepted
Content-Length: 148
Content-Type: application/json; charset=utf-8
Location: http://{host}/runtime/webhooks/durabletask/instances/b4e9bdcc435d460f8dc008115ff0a8a9?taskHub=DurableFunctionsHub&connection=Storage&code={systemKey}

{"runtimeStatus":"Running","input":"D:\\home\\LogFiles","output":null,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:16Z"}

ここでは、関数はまだ実行中です。 応答には、オーケストレーター状態で保存された入力と、最後に更新された時刻が表示されます。 Location ヘッダー値を使用して、完了を確認します。 状態が "Completed" の場合、応答は次の例のようになります。

HTTP/1.1 200 OK
Content-Length: 152
Content-Type: application/json; charset=utf-8

{"runtimeStatus":"Completed","input":"D:\\home\\LogFiles","output":452071,"createdTime":"2019-06-29T18:50:55Z","lastUpdatedTime":"2019-06-29T18:51:26Z"}

応答は、オーケストレーションが完了し、おおよその完了時刻を示しています。 output フィールドは、オーケストレーションが約 450 KB のログをアップロードしたことを示します。

例を実行するには:

  1. ローカル開発用の Durable Task Scheduler エミュレーターを起動します。

    docker run -d -p 8080:8080 -p 8082:8082 --name dts-emulator mcr.microsoft.com/dts/dts-emulator:latest
    
  2. ワーカーを起動 してオーケストレーターとアクティビティを登録します。

  3. クライアントを実行 して、作業項目の一覧を含むオーケストレーションをスケジュールします。

// Schedule the orchestration with a list of work items
var workItems = new List<string> { "item1", "item2", "item3", "item4", "item5" };
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
    nameof(ParallelProcessingOrchestration), workItems);

// Wait for completion
var result = await client.WaitForInstanceCompletionAsync(instanceId, getInputsAndOutputs: true);
Console.WriteLine($"Result: {result.ReadOutputAs<Dictionary<string, int>>().Count} items processed");

次のステップ

このサンプルは、ファンアウト/ファンイン パターンを示しています。 次のサンプルでは、 永続的タイマーを使用してモニター パターンを実装する方法を示します。

この記事では、ファンアウト/ファンイン パターンについて説明します。 その他のパターンと機能を調べる:

JavaScript SDK の例については、 Durable Task JavaScript SDK のサンプルを参照してください。