培训
模块
在 .NET Aspire 项目中使用 RabbitMQ 发送消息 - Training
本模块介绍 RabbitMQ 消息代理以及如何使用它来解耦微服务,同时确保它们能够可靠地通信。 此外还介绍如何通过 .NET Aspire 轻松地与 RabbitMQ 集成。
队列服务是很好的长时间运行服务示例,在此示例中,工作项可以排队并按顺序处理,因为之前的工作项已完成。 依靠辅助角色服务模板,可在 BackgroundService 上构建一些新功能。
在本教程中,你将了解如何执行以下操作:
提示
所有“.NET 中的辅助角色”示例源代码都可以在示例浏览器中下载。 有关详细信息,请参阅浏览代码示例:.NET 中的辅助角色。
若要使用 Visual Studio 创建新的辅助角色服务项目,请选择“文件”“新建”“项目...”。从“创建新项目”对话框搜索“辅助角色服务”,并选择辅助角色服务模板。 如果你想要使用 .NET CLI,请在工作目录中打开你最喜欢的终端。 运行 dotnet new
命令,将 <Project.Name>
替换为所需的项目名称。
dotnet new worker --name <Project.Name>
有关 .NET CLI 新建辅助角色服务项目命令的详细信息,请参阅 dotnet new 辅助角色。
提示
如果使用 Visual Studio Code,则可以从集成终端运行 .NET CLI 命令。 有关详细信息,请参阅 Visual Studio Code:集成终端。
你可能会熟悉 System.Web.Hosting
命名空间中的 QueueBackgroundWorkItem(Func<CancellationToken,Task>) 功能。
提示
System.Web
命名空间的功能故意未移植到 .NET,它仍是 .NET Framework 独有的功能。 有关详细信息,请参阅开始进行从 ASP.NET 到 ASP.NET Core 的增量迁移。
在 .NET 中,若要为受 QueueBackgroundWorkItem
功能激发的服务建模,请先将 IBackgroundTaskQueue
接口添加到项目中:
namespace App.QueueService;
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
有两种方法,一种用于公开队列功能,另一种用于将之前排队的工作项移出队列。 工作项是一个 Func<CancellationToken, ValueTask>
。 接下来,将默认实现添加到项目。
using System.Threading.Channels;
namespace App.QueueService;
public sealed class DefaultBackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public DefaultBackgroundTaskQueue(int capacity)
{
BoundedChannelOptions options = new(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
ArgumentNullException.ThrowIfNull(workItem);
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
Func<CancellationToken, ValueTask>? workItem =
await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
上述实现依赖 Channel<T> 作为队列。 使用显式容量调用 BoundedChannelOptions(Int32)。 应根据预期的应用程序负载和访问队列的并发线程数进行容量设置。 BoundedChannelFullMode.Wait 将导致调用 ChannelWriter<T>.WriteAsync 返回一个任务,该任务仅在空间可用时才会完成。 这会导致背压,以防过多的发布服务器/调用开始累积。
在以下 QueueHostedService
示例中:
ProcessTaskQueueAsync
方法在 ExecuteAsync
中返回 Task。ProcessTaskQueueAsync
中,取消排队并执行队列中的后台任务。StopAsync
中停止之前,将等待工作项。将现有 Worker
类替换为以下 C# 代码,并将该文件重命名为“QueueHostedService.cs”。
namespace App.QueueService;
public sealed class QueuedHostedService(
IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger) : BackgroundService
{
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("""
{Name} is running.
Tap W to add a work item to the
background queue.
""",
nameof(QueuedHostedService));
return ProcessTaskQueueAsync(stoppingToken);
}
private async Task ProcessTaskQueueAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
Func<CancellationToken, ValueTask>? workItem =
await taskQueue.DequeueAsync(stoppingToken);
await workItem(stoppingToken);
}
catch (OperationCanceledException)
{
// Prevent throwing if stoppingToken was signaled
}
catch (Exception ex)
{
logger.LogError(ex, "Error occurred executing task work item.");
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
logger.LogInformation(
$"{nameof(QueuedHostedService)} is stopping.");
await base.StopAsync(stoppingToken);
}
}
每当在输入设备上选择 w
键时,MonitorLoop
服务将处理托管服务的排队任务:
IBackgroundTaskQueue
注入到 MonitorLoop
服务中。IBackgroundTaskQueue.QueueBackgroundWorkItemAsync
来将工作项排入队列。try-catch
语句将捕获 OperationCanceledException。namespace App.QueueService;
public sealed class MonitorLoop(
IBackgroundTaskQueue taskQueue,
ILogger<MonitorLoop> logger,
IHostApplicationLifetime applicationLifetime)
{
private readonly CancellationToken _cancellationToken = applicationLifetime.ApplicationStopping;
public void StartMonitorLoop()
{
logger.LogInformation($"{nameof(MonitorAsync)} loop is starting.");
// Run a console user input loop in a background thread
Task.Run(async () => await MonitorAsync());
}
private async ValueTask MonitorAsync()
{
while (!_cancellationToken.IsCancellationRequested)
{
var keyStroke = Console.ReadKey();
if (keyStroke.Key == ConsoleKey.W)
{
// Enqueue a background work item
await taskQueue.QueueBackgroundWorkItemAsync(BuildWorkItemAsync);
}
}
}
private async ValueTask BuildWorkItemAsync(CancellationToken token)
{
// Simulate three 5-second tasks to complete
// for each enqueued work item
int delayLoop = 0;
var guid = Guid.NewGuid();
logger.LogInformation("Queued work item {Guid} is starting.", guid);
while (!token.IsCancellationRequested && delayLoop < 3)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
}
catch (OperationCanceledException)
{
// Prevent throwing if the Delay is cancelled
}
++ delayLoop;
logger.LogInformation("Queued work item {Guid} is running. {DelayLoop}/3", guid, delayLoop);
}
if (delayLoop is 3)
{
logger.LogInformation("Queued Background Task {Guid} is complete.", guid);
}
else
{
logger.LogInformation("Queued Background Task {Guid} was cancelled.", guid);
}
}
}
将现有 Program
内容替换为以下 C# 代码:
using App.QueueService;
HostApplicationBuilder builder = Host.CreateApplicationBuilder(args);
builder.Services.AddSingleton<MonitorLoop>();
builder.Services.AddHostedService<QueuedHostedService>();
builder.Services.AddSingleton<IBackgroundTaskQueue>(_ =>
{
if (!int.TryParse(builder.Configuration["QueueCapacity"], out var queueCapacity))
{
queueCapacity = 100;
}
return new DefaultBackgroundTaskQueue(queueCapacity);
});
IHost host = builder.Build();
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
host.Run();
已在 Program.cs 中注册这些服务。 已使用 AddHostedService
扩展方法注册托管服务。 MonitorLoop
在 Program.cs 顶级语句中启动:
MonitorLoop monitorLoop = host.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();
有关注册服务的详细信息,请参阅 .NET 中的依赖关系注入。
若要从 Visual Studio 运行应用程序,请选择 F5 或选择“调试”>“开始调试”菜单选项。 如果使用的是 .NET CLI,请从工作目录运行 dotnet run
命令:
dotnet run
有关 .NET CLI run 命令的详细信息,请参阅 dotnet run。
出现提示时,请输入 w
(或 W
)至少一次以将模拟工作项加入队列,如示例输出中所示:
info: App.QueueService.MonitorLoop[0]
MonitorAsync loop is starting.
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is running.
Tap W to add a work item to the background queue.
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
Content root path: .\queue-service
winfo: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is starting.
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 1/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 2/3
info: App.QueueService.MonitorLoop[0]
Queued work item 8453f845-ea4a-4bcb-b26e-c76c0d89303e is running. 3/3
info: App.QueueService.MonitorLoop[0]
Queued Background Task 8453f845-ea4a-4bcb-b26e-c76c0d89303e is complete.
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: App.QueueService.QueuedHostedService[0]
QueuedHostedService is stopping.
如果从 Visual Studio 内部运行应用程序,请选择“调试”>“停止调试...” 。或者,从控制台窗口中选择“Ctrl” + “C”,以发送取消信号。
培训
模块
在 .NET Aspire 项目中使用 RabbitMQ 发送消息 - Training
本模块介绍 RabbitMQ 消息代理以及如何使用它来解耦微服务,同时确保它们能够可靠地通信。 此外还介绍如何通过 .NET Aspire 轻松地与 RabbitMQ 集成。