创建队列服务

队列服务是很好的长时间运行服务示例,在此示例中,工作项可以排队并按顺序处理,因为之前的工作项已完成。 依靠辅助角色服务模板,可在 BackgroundService 上构建一些新功能。

在本教程中,你将了解如何执行以下操作:

  • 创建队列服务。
  • 将工作委托给任务队列。
  • IHostApplicationLifetime 事件注册控制台密钥侦听器。

提示

所有“.NET 中的辅助角色”示例源代码都可以在示例浏览器中下载。 有关详细信息,请参阅浏览代码示例:.NET 中的辅助角色

先决条件

创建新项目

若要使用 Visual Studio 创建新的辅助角色服务项目,请选择“文件”“新建”“项目...”。从“创建新项目”对话框搜索“辅助角色服务”,并选择辅助角色服务模板。 如果你想要使用 .NET CLI,请在工作目录中打开你最喜欢的终端。 运行 dotnet new 命令,将 <Project.Name> 替换为所需的项目名称。

dotnet new worker --name <Project.Name>

有关 .NET CLI 新建辅助角色服务项目命令的详细信息,请参阅 dotnet new 辅助角色

提示

如果使用 Visual Studio Code,则可以从集成终端运行 .NET CLI 命令。 有关详细信息,请参阅 Visual Studio Code:集成终端

创建队列服务

你可能会熟悉 System.Web.Hosting 命名空间中的 QueueBackgroundWorkItem(Func<CancellationToken,Task>) 功能。

提示

System.Web 命名空间的功能故意未移植到 .NET,它仍是 .NET Framework 独有的功能。 有关详细信息,请参阅开始进行从 ASP.NET 到 ASP.NET Core 的增量迁移

在 .NET 中,若要为受 QueueBackgroundWorkItem 功能激发的服务建模,请先将 IBackgroundTaskQueue 接口添加到项目中:

namespace App.QueueService;

public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken);
}

有两种方法,一种用于公开队列功能,另一种用于将之前排队的工作项移出队列。 工作项是一个 Func<CancellationToken, ValueTask>。 接下来,将默认实现添加到项目。

using System.Threading.Channels;

namespace App.QueueService;

public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public DefaultBackgroundTaskQueue(int capacity)
    {
        BoundedChannelOptions options = new(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem)
    {
        ArgumentNullException.ThrowIfNull(workItem);

        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        Func<CancellationToken, ValueTask>? workItem =
            await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}

上述实现依赖 Channel<T> 作为队列。 使用显式容量调用 BoundedChannelOptions(Int32)。 应根据预期的应用程序负载和访问队列的并发线程数进行容量设置。 BoundedChannelFullMode.Wait 将导致调用 ChannelWriter<T>.WriteAsync 返回一个任务,该任务仅在空间可用时才会完成。 这会导致背压,以防过多的发布服务器/调用开始累积。

重写辅助角色类

在以下 QueueHostedService 示例中:

  • ProcessTaskQueueAsync 方法在 ExecuteAsync 中返回 Task
  • ProcessTaskQueueAsync 中,取消排队并执行队列中的后台任务。
  • 服务在 StopAsync 中停止之前,将等待工作项。

将现有 Worker 类替换为以下 C# 代码,并将该文件重命名为“QueueHostedService.cs”。

namespace App.QueueService;

public sealed class QueuedHostedService(
        IBackgroundTaskQueue taskQueue,
        ILogger<QueuedHostedService> logger) : BackgroundService
{
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation("""
            {Name} is running.
            Tap W to add a work item to the 
            background queue.
            """,
            nameof(QueuedHostedService));

        return ProcessTaskQueueAsync(stoppingToken);
    }

    private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                Func<CancellationToken, ValueTask>? workItem =
                    await taskQueue.DequeueAsync(stoppingToken);

                await workItem(stoppingToken);
            }
            catch (OperationCanceledException)
            {
                // Prevent throwing if stoppingToken was signaled
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Error occurred executing task work item.");
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        logger.LogInformation(
            $"{nameof(QueuedHostedService)} is stopping.");

        await base.StopAsync(stoppingToken);
    }
}

每当在输入设备上选择 w 键时,MonitorLoop 服务将处理托管服务的排队任务:

  • IBackgroundTaskQueue 注入到 MonitorLoop 服务中。
  • 调用 IBackgroundTaskQueue.QueueBackgroundWorkItemAsync 来将工作项排入队列。
  • 工作项模拟长时间运行的后台任务:
namespace App.QueueService;

public sealed class MonitorLoop(
    IBackgroundTaskQueue taskQueue,
    ILogger<MonitorLoop> logger,
    IHostApplicationLifetime applicationLifetime)
{
    private readonly CancellationToken _cancellationToken = applicationLifetime.ApplicationStopping;

    public void StartMonitorLoop()
    {
        logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");

        // Run a console user input loop in a background thread
        Task.Run(async () => await MonitorAsync());
    }

    private async ValueTask MonitorAsync()
    {
        while (!_cancellationToken.IsCancellationRequested)
        {
            var keyStroke = Console.ReadKey();
            if (keyStroke.Key == ConsoleKey.W)
            {
                // Enqueue a background work item
                await taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
            }
        }
    }

    private async ValueTask BuildWorkItemAsync(CancellationToken token)
    {
        // Simulate three 5-second tasks to complete
        // for each enqueued work item

        int delayLoop = 0;
        var guid = Guid.NewGuid();

        logger.LogInformation("Queued work item {Guid} is starting.", guid);

        while (!token.IsCancellationRequested && delayLoop < 3)
        {
            try
            {
                await Task.Delay(TimeSpan.FromSeconds(5), token);
            }
            catch (OperationCanceledException)
            {
                // Prevent throwing if the Delay is cancelled
            }

            ++ delayLoop;

            logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
        }

        if (delayLoop is 3)
        {
            logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
        }
        else
        {
            logger.LogInformation("Queued Background Task {Guid} was cancelled.", guid);
        }
    }
}

将现有 Program 内容替换为以下 C# 代码:

using App.QueueService;

HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<MonitorLoop>();
builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ => 
{
    if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))
    {
        queueCapacity = 100;
    }

    return new DefaultBackgroundTaskQueue(queueCapacity);
});

IHost host = builder.Build();

MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();

host.Run();

已在 Program.cs 中注册这些服务。 已使用 AddHostedService 扩展方法注册托管服务。 MonitorLoop 在 Program.cs 顶级语句中启动:

MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();

有关注册服务的详细信息,请参阅 .NET 中的依赖关系注入

验证服务功能

若要从 Visual Studio 运行应用程序,请选择 F5 或选择“调试”>“开始调试”菜单选项。 如果使用的是 .NET CLI,请从工作目录运行 dotnet run 命令:

dotnet run

有关 .NET CLI run 命令的详细信息,请参阅 dotnet run

出现提示时,请输入 w(或 W)至少一次以将模拟工作项加入队列,如示例输出中所示:

info: App.QueueService.MonitorLoop[0]
      MonitorAsync loop is starting.
info: App.QueueService.QueuedHostedService[0]
      QueuedHostedService is running.

      Tap W to add a work item to the background queue.

info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
      Content root path: .\queue-service
winfo: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is starting.
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 1/3
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 2/3
info: App.QueueService.MonitorLoop[0]
      Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 3/3
info: App.QueueService.MonitorLoop[0]
      Queued Background Task 8453f845-ea4a-4bcb-b26e-c76c0d89303e is complete.
info: Microsoft.Hosting.Lifetime[0]
      Application is shutting down...
info: App.QueueService.QueuedHostedService[0]
      QueuedHostedService is stopping.

如果从 Visual Studio 内部运行应用程序,请选择“调试”>“停止调试...” 。或者,从控制台窗口中选择“Ctrl” + “C”,以发送取消信号。

另请参阅