创建队列服务

队列服务是长期运行服务的一个绝佳示例,它可以在前一个工作项完成后按顺序对工作项进行排队和处理。 依靠 Worker 服务模板,在BackgroundService基础上构建新功能。

本教程中,您将学习如何:

  • 创建队列服务。
  • 将工作委托给任务队列。
  • IHostApplicationLifetime 事件中注册控制台按键监听器。

小窍门

所有“Workers in .NET”示例源代码都可以在样例代码浏览器下载。 有关详细信息,请参阅 “浏览代码示例:.NET 中的 Workers”

先决条件

创建新项目

若要使用 Visual Studio 创建新的辅助角色服务项目,请选择“ 文件>新建>项目...”。从“ 创建新项目 ”对话框搜索“辅助角色服务”,然后选择“辅助角色服务”模板。 如果想要使用 .NET CLI,请在工作目录中打开你喜欢的终端。 运行dotnet new命令,并将<Project.Name>替换为您想要的项目名称。

dotnet new worker --name <Project.Name>

有关 .NET CLI 新辅助角色服务项目命令的详细信息,请参阅 dotnet new worker

小窍门

如果使用 Visual Studio Code,可以从集成终端运行 .NET CLI 命令。 有关详细信息,请参阅 Visual Studio Code:集成终端

创建排队服务

你可能熟悉 QueueBackgroundWorkItem(Func<CancellationToken,Task>) 命名空间中的 System.Web.Hosting 功能。

小窍门

System.Web 命名空间的功能未被有意移植到 .NET,且仍然是 .NET Framework 独有的。 有关详细信息,请参阅 从 ASP.NET 开始逐步迁移到 ASP.NET Core

在 .NET 中,若要为受 QueueBackgroundWorkItem 功能启发的服务建模,请首先向项目添加 IBackgroundTaskQueue 接口:

namespace App.QueueService;

public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken);
}

有两种方法,一种方法公开队列功能,另一种方法取消排队之前排队的工作项。 工作项是一个 Func<CancellationToken, ValueTask>。 接下来,将默认实现添加到项目。

using System.Threading.Channels;

namespace App.QueueService;

public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public DefaultBackgroundTaskQueue(int capacity)
    {
        BoundedChannelOptions options = new(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem)
    {
        ArgumentNullException.ThrowIfNull(workItem);

        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        Func<CancellationToken, ValueTask>? workItem =
            await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}

上述实现依赖于队列 Channel<T> 。 以显式容量调用 BoundedChannelOptions(Int32)。 应根据预期的应用程序负载和访问队列的并发线程数来设置容量。 BoundedChannelFullMode.Wait 导致调用 ChannelWriter<T>.WriteAsync 返回一个任务,该任务只有在有空间可用时才会完成。 这会导致回压,以防过多的发布者或调用开始累积。

重写 Worker 类

在以下 QueueHostedService 示例中:

  • ProcessTaskQueueAsync方法在ExecuteAsync中返回Task
  • ProcessTaskQueueAsync 中,取消排队并执行队列中的后台任务。
  • 服务在 StopAsync 中停止之前,将等待工作项。

将现有 Worker 类替换为以下 C# 代码,并将文件重命名为 QueueHostedService.cs

namespace App.QueueService;

public sealed class QueuedHostedService(
        IBackgroundTaskQueue taskQueue,
        ILogger<QueuedHostedService> logger) : BackgroundService
{
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation("""
            {Name} is running.
            Tap W to add a work item to the 
            background queue.
            """,
            nameof(QueuedHostedService));

        return ProcessTaskQueueAsync(stoppingToken);
    }

    private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                Func<CancellationToken, ValueTask>? workItem =
                    await taskQueue.DequeueAsync(stoppingToken);

                await workItem(stoppingToken);
            }
            catch (OperationCanceledException)
            {
                // Prevent throwing if stoppingToken was signaled
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Error occurred executing task work item.");
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation(
            $"{nameof(QueuedHostedService)} is stopping.");

        await base.StopAsync(stoppingToken);
    }
}

每当在输入设备上选择 MonitorLoop 键时,w 服务将处理托管服务的排队任务:

  • IBackgroundTaskQueue 注入到 MonitorLoop 服务中。
  • 调用 IBackgroundTaskQueue.QueueBackgroundWorkItemAsync 来将工作项排入队列。
  • 工作项模拟长时间运行的后台任务:
namespace App.QueueService;

public sealed class MonitorLoop(
    IBackgroundTaskQueue taskQueue,
    ILogger<MonitorLoop> logger,
    IHostApplicationLifetime applicationLifetime)
{
    private readonly CancellationToken _cancellationToken = applicationLifetime.ApplicationStopping;

    public void StartMonitorLoop()
    {
        logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");

        // Run a console user input loop in a background thread
        Task.Run(async () => await MonitorAsync());
    }

    private async ValueTask MonitorAsync()
    {
        while (!_cancellationToken.IsCancellationRequested)
        {
            var keyStroke = Console.ReadKey();
            if (keyStroke.Key == ConsoleKey.W)
            {
                // Enqueue a background work item
                await taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
            }
        }
    }

    private async ValueTask BuildWorkItemAsync(CancellationToken token)
    {
        // Simulate three 5-second tasks to complete
        // for each enqueued work item

        int delayLoop = 0;
        var guid = Guid.NewGuid();

        logger.LogInformation("Queued work item {Guid} is starting.", guid);

        while (!token.IsCancellationRequested && delayLoop < 3)
        {
            try
            {
                await Task.Delay(TimeSpan.FromSeconds(5), token);
            }
            catch (OperationCanceledException)
            {
                // Prevent throwing if the Delay is cancelled
            }

            ++ delayLoop;

            logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
        }

        if (delayLoop is 3)
        {
            logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
        }
        else
        {
            logger.LogInformation("Queued Background Task {Guid} was cancelled.", guid);
        }
    }
}

将现有 Program 内容替换为以下 C# 代码:

using App.QueueService;

HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<MonitorLoop>();
builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ => 
{
    if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))
    {
        queueCapacity = 100;
    }

    return new DefaultBackgroundTaskQueue(queueCapacity);
});

IHost host = builder.Build();

MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();

host.Run();

服务注册在 (Program.cs) 中。 托管服务通过 AddHostedService 扩展方法注册。 MonitorLoopProgram.cs 的顶级语句中启动:

MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();

有关注册服务的详细信息,请参阅 .NET 中的依赖关系注入

验证服务功能

若要从 Visual Studio 运行应用程序,请选择 F5 或选择 “调试>开始调试 ”菜单选项。 如果使用 .NET CLI,请从工作目录中运行以下命令 dotnet run

dotnet run

有关 .NET CLI 运行命令的详细信息,请参阅 dotnet run

系统提示时,请输入 wW,并确保至少输入一次以排队模拟工作项,如示例输出所示:

info: App.QueueService.MonitorLoop[0]
      MonitorAsync loop is starting.
info: App.QueueService.QueuedHostedService[0]
      QueuedHostedService is running.

      Tap W to add a work item to the background queue.

info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
      Content root path: .\queue-service
winfo: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is starting.
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 1/3
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 2/3
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 3/3
info: App.QueueService.MonitorLoop[0]
      Queued Background Task 8453f845-ea4a-4bcb-b26e-c76c0d89303e is complete.
info: Microsoft.Hosting.Lifetime[0]
      Application is shutting down...
info: App.QueueService.QueuedHostedService[0]
      QueuedHostedService is stopping.

如果从 Visual Studio 中运行应用程序,请选择 “调试>停止调试...”。或者,从控制台窗口中选择 Ctrl + C 以发出取消信号。

另请参阅