Queue サービスを作成する

Queue サービスは、実行時間の長いサービスの良い例です。作業項目をキューに入れ、前の作業項目が完了してから順番に処理できます。 Worker サービス テンプレートに依存して、BackgroundService の上に新しい機能を構築します。

このチュートリアルでは、以下の内容を学習します。

  • Queue サービスを作成します。
  • タスク キューに作業を委任します。
  • IHostApplicationLifetime イベントからコンソール キー リスナーを登録します。

ヒント

".NET でのワーカー" のサンプル ソース コードはすべて、サンプル ブラウザーでダウンロードできます。 詳細については、コード サンプルの参照: .NET でのワーカーに関するページをご覧ください。

必須コンポーネント

新しいプロジェクトを作成する

Visual Studio を使用して新しい Worker サービス プロジェクトを作成するには、[ファイル][新規][プロジェクト] を選択します。[新しいプロジェクトの作成] ダイアログで "Worker サービス" を検索し、Worker サービス テンプレートを選択します。 .NET CLI を使用する場合は、作業ディレクトリで好みのターミナルを開きます。 dotnet new コマンドを実行し、<Project.Name> を目的のプロジェクト名に置き換えます。

dotnet new worker --name <Project.Name>

.NET CLI の new worker サービス プロジェクト コマンドの詳細については、「dotnet new worker」を参照してください。

ヒント

Visual Studio Code を使用している場合は、統合ターミナルから .NET CLI コマンドを実行できます。 詳細については、Visual Studio Code の統合ターミナルに関する記事を参照してください。

キュー サービスを作成する

System.Web.Hosting 名前空間の QueueBackgroundWorkItem(Func<CancellationToken,Task>) 機能についてよく理解しているかもしれません。 この機能にインスピレーションを得てサービスをモデル化するには、まず、IBackgroundTaskQueue インターフェイスをプロジェクトに追加します。

namespace App.QueueService;

public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken);
}

2 つのメソッドがあります。1 つはキュー機能を公開し、もう 1 つは以前にキューに入れられた作業項目をデキューします。 "作業項目" は Func<CancellationToken, ValueTask> です。 次に、既定の実装をプロジェクトに追加します。

using System.Threading.Channels;

namespace App.QueueService;

public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public DefaultBackgroundTaskQueue(int capacity)
    {
        BoundedChannelOptions options = new(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem)
    {
        if (workItem is null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        Func<CancellationToken, ValueTask>? workItem =
            await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}

上記の実装は、キューとして Channel<T> に依存しています。 BoundedChannelOptions(Int32) は、明示的な容量で呼び出されます。 容量は、予想されるアプリケーションの負荷と、キューにアクセスする同時実行スレッドの数に基づいて、設定する必要があります。 BoundedChannelFullMode.Wait によって、タスクを返すために ChannelWriter<T>.WriteAsync が呼び出されます。これは、領域が使用可能になった場合にのみ完了します。 蓄積を始めるパブリッシャーや呼び出しが多すぎる場合、これによりバックプレッシャが発生します。

Worker クラスを書き換える

次の QueueHostedService の例では以下のようになります。

  • ProcessTaskQueueAsync メソッドからは、ExecuteAsyncTask が返されます。
  • ProcessTaskQueueAsync で、キュー内のバックグラウンド タスクがデキューされ、実行されます。
  • 作業項目が待機されてから、StopAsync でサービスが停止します。

既存の Worker クラスを次の C# コードに置き換え、ファイルの名前を QueueHostedService.cs に変更します。

namespace App.QueueService;

public sealed class QueuedHostedService : BackgroundService
{
    private readonly IBackgroundTaskQueue _taskQueue;
    private readonly ILogger<QueuedHostedService> _logger;

    public QueuedHostedService(
        IBackgroundTaskQueue taskQueue,
        ILogger<QueuedHostedService> logger) =>
        (_taskQueue, _logger) = (taskQueue, logger);

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation(
            $"{nameof(QueuedHostedService)} is running.{Environment.NewLine}" +
            $"{Environment.NewLine}Tap W to add a work item to the " +
            $"background queue.{Environment.NewLine}");

        return ProcessTaskQueueAsync(stoppingToken);
    }

    private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                Func<CancellationToken, ValueTask>? workItem =
                    await _taskQueue.DequeueAsync(stoppingToken);

                await workItem(stoppingToken);
            }
            catch (OperationCanceledException)
            {
                // Prevent throwing if stoppingToken was signaled
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error occurred executing task work item.");
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation(
            $"{nameof(QueuedHostedService)} is stopping.");

        await base.StopAsync(stoppingToken);
    }
}

MonitorLoop サービスは、w キーが入力デバイスで選択されると常に、ホステッド サービスのためにタスクのエンキューを処理します。

  • IBackgroundTaskQueueMonitorLoop サービスに挿入されます。
  • IBackgroundTaskQueue.QueueBackgroundWorkItemAsync が呼び出され、作業項目がエンキューされます。
  • 作業項目により、実行時間の長いバックグラウンド タスクがシミュレートされます。
    • 5 秒間の遅延が 3 回実行されます (Delay)。
    • タスクが取り消された場合、try-catch ステートメントによって OperationCanceledException がトラップされます。
namespace App.QueueService;

public sealed class MonitorLoop
{
    private readonly IBackgroundTaskQueue _taskQueue;
    private readonly ILogger<MonitorLoop> _logger;
    private readonly CancellationToken _cancellationToken;

    public MonitorLoop(
        IBackgroundTaskQueue taskQueue,
        ILogger<MonitorLoop> logger,
        IHostApplicationLifetime applicationLifetime)
    {
        _taskQueue = taskQueue;
        _logger = logger;
        _cancellationToken = applicationLifetime.ApplicationStopping;
    }

    public void StartMonitorLoop()
    {
        _logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");

        // Run a console user input loop in a background thread
        Task.Run(async () => await MonitorAsync());
    }

    private async ValueTask MonitorAsync()
    {
        while (!_cancellationToken.IsCancellationRequested)
        {
            var keyStroke = Console.ReadKey();
            if (keyStroke.Key == ConsoleKey.W)
            {
                // Enqueue a background work item
                await _taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
            }
        }
    }

    private async ValueTask BuildWorkItemAsync(CancellationToken token)
    {
        // Simulate three 5-second tasks to complete
        // for each enqueued work item

        int delayLoop = 0;
        var guid = Guid.NewGuid();

        _logger.LogInformation("Queued work item {Guid} is starting.", guid);

        while (!token.IsCancellationRequested && delayLoop < 3)
        {
            try
            {
                await Task.Delay(TimeSpan.FromSeconds(5), token);
            }
            catch (OperationCanceledException)
            {
                // Prevent throwing if the Delay is cancelled
            }

            ++ delayLoop;

            _logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
        }

        string format = delayLoop switch
        {
            3 => "Queued Background Task {Guid} is complete.",
            _ => "Queued Background Task {Guid} was cancelled."
        };

        _logger.LogInformation(format, guid);
    }
}

既存の Program の内容を、次の C# コードに置き換えます。

using App.QueueService;

using IHost host = Host.CreateDefaultBuilder(args)
    .ConfigureServices((context, services) =>
    {
        services.AddSingleton<MonitorLoop>();
        services.AddHostedService<QueuedHostedService>();
        services.AddSingleton<IBackgroundTaskQueue>(_ => 
        {
            if (!int.TryParse(context.Configuration["QueueCapacity"], out var queueCapacity))
            {
                queueCapacity = 100;
            }

            return new DefaultBackgroundTaskQueue(queueCapacity);
        });
    })
    .Build();

MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();

await host.RunAsync();

サービスは IHostBuilder.ConfigureServices (Program.cs) に登録されています。 ホステッド サービスは、AddHostedService 拡張メソッドを使用して登録されます。 MonitorLoop は、Program.cs のトップレベル ステートメントで開始されます。

MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();

サービスの登録の詳細については、「.NET での依存関係の挿入」を参照してください。

サービスの機能を確認する

Visual Studio からアプリケーションを実行するには、F5 キーを押すか、 [デバッグ]>[デバッグの開始] メニュー オプションを選択します。 .NET CLI を使用している場合は、作業ディレクトリから dotnet run コマンドを実行します。

dotnet run

.NET CLI の run コマンドの詳細については、「dotnet run」を参照してください。

メッセージが表示されたら、w (または W) を少なくとも 1 回入力して、エミュレートされた作業項目をキューに入れます。 次のような出力が表示されます。

info: App.QueueService.MonitorLoop[0]
      MonitorAsync loop is starting.
info: App.QueueService.QueuedHostedService[0]
      QueuedHostedService is running.

      Tap W to add a work item to the background queue.

info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
      Content root path: .\queue-service
winfo: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is starting.
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 1/3
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 2/3
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 3/3
info: App.QueueService.MonitorLoop[0]
      Queued Background Task 8453f845-ea4a-4bcb-b26e-c76c0d89303e is complete.
info: Microsoft.Hosting.Lifetime[0]
      Application is shutting down...
info: App.QueueService.QueuedHostedService[0]
      QueuedHostedService is stopping.

Visual Studio 内からアプリケーションを実行している場合は、 [デバッグ]>[デバッグの停止] を選択します。または、コンソール ウィンドウで Ctrl + C キーを押して、キャンセルを通知します。

関連項目