队列服务是长期运行服务的一个绝佳示例,它可以在前一个工作项完成后按顺序对工作项进行排队和处理。 依靠 Worker 服务模板,在BackgroundService基础上构建新功能。
本教程中,您将学习如何:
- 创建队列服务。
- 将工作委托给任务队列。
- 从 IHostApplicationLifetime 事件中注册控制台按键监听器。
小窍门
所有“Workers in .NET”示例源代码都可以在样例代码浏览器下载。 有关详细信息,请参阅 “浏览代码示例:.NET 中的 Workers”。
先决条件
- .NET 8.0 SDK 或更高版本
- .NET 集成开发环境 (IDE)
- 随意使用 Visual Studio
创建新项目
若要使用 Visual Studio 创建新的辅助角色服务项目,请选择“ 文件>新建>项目...”。从“ 创建新项目 ”对话框搜索“辅助角色服务”,然后选择“辅助角色服务”模板。 如果想要使用 .NET CLI,请在工作目录中打开你喜欢的终端。 运行dotnet new命令,并将<Project.Name>替换为您想要的项目名称。
dotnet new worker --name <Project.Name>
有关 .NET CLI 新辅助角色服务项目命令的详细信息,请参阅 dotnet new worker。
小窍门
如果使用 Visual Studio Code,可以从集成终端运行 .NET CLI 命令。 有关详细信息,请参阅 Visual Studio Code:集成终端。
创建排队服务
你可能熟悉 QueueBackgroundWorkItem(Func<CancellationToken,Task>) 命名空间中的 System.Web.Hosting 功能。
小窍门
System.Web 命名空间的功能未被有意移植到 .NET,且仍然是 .NET Framework 独有的。 有关详细信息,请参阅 从 ASP.NET 开始逐步迁移到 ASP.NET Core。
在 .NET 中,若要为受 QueueBackgroundWorkItem 功能启发的服务建模,请首先向项目添加 IBackgroundTaskQueue 接口:
namespace App.QueueService;
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
有两种方法,一种方法公开队列功能,另一种方法取消排队之前排队的工作项。
工作项是一个 Func<CancellationToken, ValueTask>。 接下来,将默认实现添加到项目。
using System.Threading.Channels;
namespace App.QueueService;
public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public DefaultBackgroundTaskQueue(int capacity)
{
BoundedChannelOptions options = new(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
ArgumentNullException.ThrowIfNull(workItem);
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
Func<CancellationToken, ValueTask>? workItem =
await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
上述实现依赖于队列 Channel<T> 。 以显式容量调用 BoundedChannelOptions(Int32)。 应根据预期的应用程序负载和访问队列的并发线程数来设置容量。 BoundedChannelFullMode.Wait 导致调用 ChannelWriter<T>.WriteAsync 返回一个任务,该任务只有在有空间可用时才会完成。 这会导致回压,以防过多的发布者或调用开始累积。
重写 Worker 类
在以下 QueueHostedService 示例中:
- 该
ProcessTaskQueueAsync方法在ExecuteAsync中返回Task。 - 在
ProcessTaskQueueAsync中,取消排队并执行队列中的后台任务。 - 服务在
StopAsync中停止之前,将等待工作项。
将现有 Worker 类替换为以下 C# 代码,并将文件重命名为 QueueHostedService.cs。
namespace App.QueueService;
public sealed class QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger) : BackgroundService
{
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("""
{Name} is running.
Tap W to add a work item to the
background queue.
""",
nameof(QueuedHostedService));
return ProcessTaskQueueAsync(stoppingToken);
}
private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
Func<CancellationToken, ValueTask>? workItem =
await taskQueue.DequeueAsync(stoppingToken);
await workItem(stoppingToken);
}
catch (OperationCanceledException)
{
// Prevent throwing if stoppingToken was signaled
}
catch (Exception ex)
{
logger.LogError(ex, "Error occurred executing task work item.");
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
logger.LogInformation(
$"{nameof(QueuedHostedService)} is stopping.");
await base.StopAsync(stoppingToken);
}
}
每当在输入设备上选择 MonitorLoop 键时,w 服务将处理托管服务的排队任务:
-
IBackgroundTaskQueue注入到MonitorLoop服务中。 - 调用
IBackgroundTaskQueue.QueueBackgroundWorkItemAsync来将工作项排入队列。 - 工作项模拟长时间运行的后台任务:
- 执行了三个 5 秒的延迟Delay。
- 如果任务被取消,则
try-catch语句会捕获 OperationCanceledException。
namespace App.QueueService;
public sealed class MonitorLoop(
IBackgroundTaskQueue taskQueue,
ILogger<MonitorLoop> logger,
IHostApplicationLifetime applicationLifetime)
{
private readonly CancellationToken _cancellationToken = applicationLifetime.ApplicationStopping;
public void StartMonitorLoop()
{
logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");
// Run a console user input loop in a background thread
Task.Run(async () => await MonitorAsync());
}
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
var keyStroke = Console.ReadKey();
if (keyStroke.Key == ConsoleKey.W)
{
// Enqueue a background work item
await taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
}
}
}
private async ValueTask BuildWorkItemAsync(CancellationToken token)
{
// Simulate three 5-second tasks to complete
// for each enqueued work item
int delayLoop = 0;
var guid = Guid.NewGuid();
logger.LogInformation("Queued work item {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
++ delayLoop;
logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
}
if (delayLoop is 3)
{
logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
}
else
{
logger.LogInformation("Queued Background Task {Guid} was cancelled.", guid);
}
}
}
将现有 Program 内容替换为以下 C# 代码:
using App.QueueService;
HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<MonitorLoop>();
builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ =>
{
if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))
{
queueCapacity = 100;
}
return new DefaultBackgroundTaskQueue(queueCapacity);
});
IHost host = builder.Build();
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
host.Run();
服务注册在 (Program.cs) 中。 托管服务通过 AddHostedService 扩展方法注册。
MonitorLoop 在 Program.cs 的顶级语句中启动:
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
有关注册服务的详细信息,请参阅 .NET 中的依赖关系注入。
验证服务功能
若要从 Visual Studio 运行应用程序,请选择 F5 或选择 “调试>开始调试 ”菜单选项。 如果使用 .NET CLI,请从工作目录中运行以下命令 dotnet run :
dotnet run
有关 .NET CLI 运行命令的详细信息,请参阅 dotnet run。
系统提示时,请输入 w 或 W,并确保至少输入一次以排队模拟工作项,如示例输出所示:
info: App.QueueService.MonitorLoop[0]
MonitorAsync loop is starting.
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is running.
Tap W to add a work item to the background queue.
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
Content root path: .\queue-service
winfo: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is starting.
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 1/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 2/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 3/3
info: App.QueueService.MonitorLoop[0]
Queued Background Task 8453f845-ea4a-4bcb-b26e-c76c0d89303e is complete.
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is stopping.
如果从 Visual Studio 中运行应用程序,请选择 “调试>停止调试...”。或者,从控制台窗口中选择 Ctrl + C 以发出取消信号。