创建队列服务
队列服务是很好的长时间运行服务示例,在此示例中,工作项可以排队并按顺序处理,因为之前的工作项已完成。 依靠辅助角色服务模板,在 之上 BackgroundService生成新功能。
在本教程中,你将了解如何执行以下操作:
- 创建队列服务。
- 将工作委托给任务队列。
- 从 IHostApplicationLifetime 事件注册控制台密钥侦听器。
提示
所有“.NET 中的辅助角色”示例源代码都可以在示例浏览器中下载。 有关详细信息,请参阅浏览代码示例:.NET 中的辅助角色。
先决条件
- .NET 7.0 SDK 或更高版本
- .NET 集成开发环境 (IDE)
- 随意使用 Visual Studio
- .NET 6.0 SDK 或更高版本
- .NET 集成开发环境 (IDE)
- 随意使用 Visual Studio
创建新项目
若要使用 Visual Studio 创建新的辅助角色服务项目,请选择“文件”“新建”“项目...”。从“创建新项目”对话框搜索“辅助角色服务”,并选择辅助角色服务模板。 如果你想要使用 .NET CLI,请在工作目录中打开你最喜欢的终端。 运行 dotnet new
命令,将 <Project.Name>
替换为所需的项目名称。
dotnet new worker --name <Project.Name>
有关 .NET CLI 新建辅助角色服务项目命令的详细信息,请参阅 dotnet new 辅助角色。
提示
如果使用 Visual Studio Code,则可以从集成终端运行 .NET CLI 命令。 有关详细信息,请参阅 Visual Studio Code:集成终端。
创建队列服务
你可能会熟悉 System.Web.Hosting
命名空间中的 QueueBackgroundWorkItem(Func<CancellationToken,Task>) 功能。
提示
命名空间的功能System.Web
未有意移植到 .NET,并且仍对.NET Framework独占。 有关详细信息,请参阅增量 ASP.NET 入门以 ASP.NET Core迁移。
在 .NET 中,若要为受 QueueBackgroundWorkItem
功能启发的服务建模,请首先向项目添加接口 IBackgroundTaskQueue
:
namespace App.QueueService;
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
namespace App.QueueService;
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
有两种方法,一种用于公开队列功能,另一种用于将之前排队的工作项移出队列。 工作项是一个 Func<CancellationToken, ValueTask>
。 接下来,将默认实现添加到项目。
using System.Threading.Channels;
namespace App.QueueService;
public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public DefaultBackgroundTaskQueue(int capacity)
{
BoundedChannelOptions options = new(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem is null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
Func<CancellationToken, ValueTask>? workItem =
await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
using System.Threading.Channels;
namespace App.QueueService;
public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public DefaultBackgroundTaskQueue(int capacity)
{
BoundedChannelOptions options = new(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem is null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
Func<CancellationToken, ValueTask>? workItem =
await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
上述实现依赖 Channel<T> 作为队列。 使用显式容量调用 BoundedChannelOptions(Int32)。 应根据预期的应用程序负载和访问队列的并发线程数进行容量设置。 BoundedChannelFullMode.Wait 导致对 ChannelWriter<T>.WriteAsync 的调用返回一个任务,该任务仅在空间可用时完成。 这会导致背压,以防过多的发布者/调用开始累积。
重写辅助角色类
在以下 QueueHostedService
示例中:
ProcessTaskQueueAsync
方法在ExecuteAsync
中返回 Task。- 在
ProcessTaskQueueAsync
中,取消排队并执行队列中的后台任务。 - 服务在
StopAsync
中停止之前,将等待工作项。
将现有 Worker
类替换为以下 C# 代码,并将该文件重命名为“QueueHostedService.cs”。
namespace App.QueueService;
public sealed class QueuedHostedService : BackgroundService
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger) =>
(_taskQueue, _logger) = (taskQueue, logger);
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"{nameof(QueuedHostedService)} is running.{Environment.NewLine}" +
$"{Environment.NewLine}Tap W to add a work item to the " +
$"background queue.{Environment.NewLine}");
return ProcessTaskQueueAsync(stoppingToken);
}
private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
Func<CancellationToken, ValueTask>? workItem =
await _taskQueue.DequeueAsync(stoppingToken);
await workItem(stoppingToken);
}
catch (OperationCanceledException)
{
// Prevent throwing if stoppingToken was signaled
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing task work item.");
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"{nameof(QueuedHostedService)} is stopping.");
await base.StopAsync(stoppingToken);
}
}
namespace App.QueueService;
public sealed class QueuedHostedService : BackgroundService
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger) =>
(_taskQueue, _logger) = (taskQueue, logger);
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"{nameof(QueuedHostedService)} is running.{Environment.NewLine}" +
$"{Environment.NewLine}Tap W to add a work item to the " +
$"background queue.{Environment.NewLine}");
return ProcessTaskQueueAsync(stoppingToken);
}
private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
Func<CancellationToken, ValueTask>? workItem =
await _taskQueue.DequeueAsync(stoppingToken);
await workItem(stoppingToken);
}
catch (OperationCanceledException)
{
// Prevent throwing if stoppingToken was signaled
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred executing task work item.");
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"{nameof(QueuedHostedService)} is stopping.");
await base.StopAsync(stoppingToken);
}
}
每当在输入设备上选择 w
键时,MonitorLoop
服务将处理托管服务的排队任务:
IBackgroundTaskQueue
注入到MonitorLoop
服务中。- 调用
IBackgroundTaskQueue.QueueBackgroundWorkItemAsync
来将工作项排入队列。 - 工作项模拟长时间运行的后台任务:
- 将执行三次 5 秒的延迟 (Delay)。
- 如果任务已取消,
try-catch
语句将捕获 OperationCanceledException。
namespace App.QueueService;
public sealed class MonitorLoop
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<MonitorLoop> _logger;
private readonly CancellationToken _cancellationToken;
public MonitorLoop(
IBackgroundTaskQueue taskQueue,
ILogger<MonitorLoop> logger,
IHostApplicationLifetime applicationLifetime)
{
_taskQueue = taskQueue;
_logger = logger;
_cancellationToken = applicationLifetime.ApplicationStopping;
}
public void StartMonitorLoop()
{
_logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");
// Run a console user input loop in a background thread
Task.Run(async () => await MonitorAsync());
}
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
var keyStroke = Console.ReadKey();
if (keyStroke.Key == ConsoleKey.W)
{
// Enqueue a background work item
await _taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
}
}
}
private async ValueTask BuildWorkItemAsync(CancellationToken token)
{
// Simulate three 5-second tasks to complete
// for each enqueued work item
int delayLoop = 0;
var guid = Guid.NewGuid();
_logger.LogInformation("Queued work item {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
++ delayLoop;
_logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
}
string format = delayLoop switch
{
3 => "Queued Background Task {Guid} is complete.",
_ => "Queued Background Task {Guid} was cancelled."
};
_logger.LogInformation(format, guid);
}
}
namespace App.QueueService;
public sealed class MonitorLoop
{
private readonly IBackgroundTaskQueue _taskQueue;
private readonly ILogger<MonitorLoop> _logger;
private readonly CancellationToken _cancellationToken;
public MonitorLoop(
IBackgroundTaskQueue taskQueue,
ILogger<MonitorLoop> logger,
IHostApplicationLifetime applicationLifetime)
{
_taskQueue = taskQueue;
_logger = logger;
_cancellationToken = applicationLifetime.ApplicationStopping;
}
public void StartMonitorLoop()
{
_logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");
// Run a console user input loop in a background thread
Task.Run(async () => await MonitorAsync());
}
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
var keyStroke = Console.ReadKey();
if (keyStroke.Key == ConsoleKey.W)
{
// Enqueue a background work item
await _taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
}
}
}
private async ValueTask BuildWorkItemAsync(CancellationToken token)
{
// Simulate three 5-second tasks to complete
// for each enqueued work item
int delayLoop = 0;
var guid = Guid.NewGuid();
_logger.LogInformation("Queued work item {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
++ delayLoop;
_logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
}
string format = delayLoop switch
{
3 => "Queued Background Task {Guid} is complete.",
_ => "Queued Background Task {Guid} was cancelled."
};
_logger.LogInformation(format, guid);
}
}
将现有 Program
内容替换为以下 C# 代码:
using App.QueueService;
HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<MonitorLoop>();
builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ =>
{
if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))
{
queueCapacity = 100;
}
return new DefaultBackgroundTaskQueue(queueCapacity);
});
IHost host = builder.Build();
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
host.Run();
using App.QueueService;
IHostBuilder builder = Host.CreateDefaultBuilder(args)
.ConfigureServices((context, services) =>
{
services.AddSingleton<MonitorLoop>();
services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue>(_ =>
{
if (!int.TryParse(context.Configuration["QueueCapacity"],
out int queueCapacity))
{
queueCapacity = 100;
}
return new DefaultBackgroundTaskQueue(queueCapacity);
});
});
IHost host = builder.Build();
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
host.Run();
这些服务在 (Program.cs) 中注册。 已使用 AddHostedService
扩展方法注册托管服务。 MonitorLoop
在 Program.cs 顶级语句中启动:
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
有关注册服务的详细信息,请参阅 .NET 中的依赖关系注入。
验证服务功能
若要从 Visual Studio 运行应用程序,请选择 F5 或选择“调试”>“开始调试”菜单选项。 如果使用的是 .NET CLI,请从工作目录运行 dotnet run
命令:
dotnet run
有关 .NET CLI run 命令的详细信息,请参阅 dotnet run。
出现提示时, w
输入 (或 W
至少) 一次以将模拟工作项排队,如示例输出中所示:
info: App.QueueService.MonitorLoop[0]
MonitorAsync loop is starting.
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is running.
Tap W to add a work item to the background queue.
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
Content root path: .\queue-service
winfo: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is starting.
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 1/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 2/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 3/3
info: App.QueueService.MonitorLoop[0]
Queued Background Task 8453f845-ea4a-4bcb-b26e-c76c0d89303e is complete.
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is stopping.
如果从 Visual Studio 内部运行应用程序,请选择“调试”>“停止调试...” 。或者,从控制台窗口中选择“Ctrl” + “C”,以发送取消信号。