你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
快速入门 - 向/从 Azure 服务总线队列 (.NET) 发送/接收消息
在本快速入门中,你将执行以下步骤:
使用 Azure 门户创建服务总线命名空间。
使用 Azure 门户创建服务总线队列。
编写 .NET 控制台应用程序,向队列发送一组消息。
编写 .NET 控制台应用程序,从队列接收这些消息。
注意
本快速入门分步介绍了如何实施一个简单方案,将一批消息发送到服务总线队列并接收这些消息。 有关 .NET 客户端库的概述,请参阅适用于 .NET 的 Azure 服务总线客户端库。 有关更多示例,请参阅 GitHub 上的服务总线 .NET 示例。
先决条件
如果你是首次使用该服务,请在使用本快速入门之前先参阅服务总线概述。
- Azure 订阅。 若要使用 Azure 服务(包括 Azure 服务总线),需要一个订阅。 如果没有现成的 Azure 帐户,可以注册免费试用版。
- Visual Studio 2022。 示例应用程序利用 C# 10 中引入的新功能。 你仍可使用以前的 C# 语言版本的服务总线客户端库,但语法可能会有所不同。 要使用最新语法,建议安装 .NET 6.0 或更高版本,并将语言版本设置为
latest
。 如果使用 Visual Studio,Visual Studio 2022 以前的版本与生成 C# 10 项目时所需的工具将不兼容。
在 Azure 门户中创建命名空间
若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于应用程序中的服务总线资源(队列、主题等)的范围容器。
创建命名空间:
登录 Azure 门户。
导航到“所有服务”页。
在左侧的导航栏中,从类别列表中选择“集成”,将鼠标悬停在“服务总线”上,然后选择“服务总线”磁贴上的 + 按钮。。
在“创建命名空间”页的“基本信息”标记中,执行以下步骤 :
对于“订阅”,请选择要在其中创建命名空间的 Azure 订阅。
对于“资源组”,请选择该命名空间驻留到的现有资源组,或创建一个新资源组。
输入命名空间的名称。 命名空间名称应遵循以下命名约定:
- 该名称在 Azure 中必须唯一。 系统会立即检查该名称是否可用。
- 名称长度最少为 6 个字符,最多为 50 个字符。
- 名称只能包含字母、数字、连字符“-”。
- 名称必须以字母开头,并以字母或数字结尾。
- 名称不以“-sb”或“-mgmt”结尾。
对于“位置”,请选择托管该命名空间的区域。
对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。
重要
若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题/订阅。
如果选择了“高级”定价层,请指定“消息传送单元”数 。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为消息传送单元。 高级命名空间至少具有一个消息传送单元。 可为每个服务总线高级命名空间选择 1、2、4、8 或 16 个消息传送单元。 有关详细信息,请参阅服务总线高级消息传送。
在页面底部选择“查看 + 创建”。
在“查看 + 创建”页上,查看设置,然后选择“创建” 。
资源部署成功后,在部署页上选择“转到资源”。
将会看到服务总线命名空间的主页。
在 Azure 门户中创建队列
在“服务总线命名空间”页面上,选择左侧导航菜单中的“队列”。
在“队列”页面上,选择工具栏上的“+ 队列”。
输入队列名称,其他值则保留默认值。
现在选择“创建”。
重要
如果不熟悉 Azure,你可能会感觉“连接字符串”选项更易于使用。 选择“连接字符串”选项卡,以查看本快速入门中有关使用连接字符串的说明。 建议在实际应用程序和生产环境中使用“无密码”选项。
向 Azure 验证应用
本快速入门介绍了连接到 Azure 服务总线的两种方法:无密码和连接字符串方法。
第一个选项展示如何使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到服务总线命名空间。 无需担心代码、配置文件或安全存储(如 Azure Key Vault)中存在硬编码的连接字符串。
第二个选项展示如何使用连接字符串连接到服务总线命名空间。 如果不熟悉 Azure,你可能会感觉连接字符串选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅身份验证和授权。 还可以在概述页上阅读有关无密码身份验证的详细信息。
将角色分配到 Microsoft Entra 用户
在本地开发时,请确保连接到 Azure 服务总线的用户帐户具有正确的权限。 你需要拥有 Azure 服务总线数据所有者角色才能发送和接收消息。 若要为自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write
操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可在范围概述页上详细了解角色分配的可用范围。
以下示例将 Azure Service Bus Data Owner
角色分配给用户帐户,该角色提供对 Azure 服务总线资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。
适用于 Azure 服务总线的 Azure 内置角色
对于 Azure 服务总线,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下 Azure 内置角色,用于授予对服务总线命名空间的访问权限:
- Azure 服务总线数据所有者:允许对服务总线命名空间及其实体(队列、主题、订阅和筛选器)进行数据访问。 此角色的成员可以从队列或主题/订阅发送和接收消息。
- Azure 服务总线数据发送者:使用此角色可以为服务总线命名空间及其实体提供发送访问权限。
- Azure 服务总线数据接收者:使用此角色可以为服务总线命名空间及其实体提供接收访问权限。
如果要创建自定义角色,请参阅执行服务总线操作所需的权限。
将 Microsoft Entra 用户添加到“Azure 服务总线所有者”角色
将 Microsoft Entra 用户名添加到服务总线命名空间级别的 Azure 服务总线数据所有者角色。 它将允许在用户帐户上下文中运行的应用将消息发送到队列或主题,并从队列或主题的订阅中接收消息。
重要
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
如果未在 Azure 门户中打开“服务总线命名空间”页,请使用主搜索栏或左侧导航找到你的服务总线命名空间。
在概述页面上,从左侧菜单中选择“访问控制(IAM)”。
在“访问控制 (IAM)”页上,选择“角色分配”选项卡。
从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。
使用搜索框将结果筛选为所需角色。 对于此示例,请搜索
Azure Service Bus Data Owner
并选择匹配的结果。 然后选择“下一步” 。在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。
在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。
选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。
启动 Visual Studio 并登录到 Azure
可以使用以下步骤授权对服务总线命名空间的访问:
启动 Visual Studio。 如果看到“入门”窗口,请在右窗格中选择“在不使用代码的情况下继续”链接。
选择 Visual Studio 右上角的“登录”按钮。
使用你之前为其分配角色的 Microsoft Entra 帐户登录。
将消息发送到队列
本部分介绍如何创建一个向服务总线队列发送消息的 .NET 控制台应用程序。
注意
本快速入门分步介绍了如何实施一个简单方案,将一批消息发送到服务总线队列并接收这些消息。 有关其他和高级方案的更多示例,请参阅 GitHub 上的服务总线 .NET 示例。
创建控制台应用程序
在 Visual Studio 中,选择“文件”->“新建”->“项目”菜单。
在“创建新项目”对话框中执行以下步骤:如果看不到此对话框,请在菜单中选择“文件”,然后依次选择“新建”、“项目”。
选择“C#”作为编程语言。
选择“控制台”作为应用程序类型。
从结果列表中选择“控制台应用”。
然后,选择“下一步” 。
输入 QueueSender 作为项目名称,输入 ServiceBusQueueQuickStart 作为解决方案名称,然后选择下一步。
在“其他信息”页面,选择“创建”来创建解决方案和项目。
向项目添加 NuGet 包
在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”。
运行以下命令以安装 Azure.Messaging.ServiceBus NuGet 包。
Install-Package Azure.Messaging.ServiceBus
运行以下命令以安装 Azure.Identity NuGet 包。
Install-Package Azure.Identity
添加将消息发送到队列的代码
将
Program.cs
的内容替换为以下代码。 以下部分概述了重要步骤,并在代码注释中提供了其他信息。- 使用
DefaultAzureCredential
对象创建 ServiceBusClient 对象。DefaultAzureCredential
将自动发现并使用你的 Visual Studio 登录凭据向 Azure 服务总线进行身份验证。 - 对 ServiceBusClient 对象调用 CreateSender 方法,从而为特定的“服务总线”队列创建 ServiceBusSender 对象。
- 使用 ServiceBusSender.CreateMessageBatchAsync 方法创建 ServiceBusMessageBatch 对象。
- 使用 ServiceBusMessageBatch.TryAddMessage 将消息添加到该批次。
- 使用 ServiceBusSender.SendMessagesAsync 方法将批量消息发送到“服务总线”队列。
重要
使用服务总线命名空间和队列的名称更新代码片段中的占位符值(
<NAMESPACE-NAME>
和<QUEUE-NAME>
)。using Azure.Messaging.ServiceBus; using Azure.Identity; // name of your Service Bus queue // the client that owns the connection and can be used to create senders and receivers ServiceBusClient client; // the sender used to publish messages to the queue ServiceBusSender sender; // number of messages to be sent to the queue const int numOfMessages = 3; // The Service Bus client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when messages are being published or read // regularly. // // Set the transport type to AmqpWebSockets so that the ServiceBusClient uses the port 443. // If you use the default AmqpTcp, ensure that ports 5671 and 5672 are open. var clientOptions = new ServiceBusClientOptions { TransportType = ServiceBusTransportType.AmqpWebSockets }; //TODO: Replace the "<NAMESPACE-NAME>" and "<QUEUE-NAME>" placeholders. client = new ServiceBusClient( "<NAMESPACE-NAME>.servicebus.windows.net", new DefaultAzureCredential(), clientOptions); sender = client.CreateSender("<QUEUE-NAME>"); // create a batch using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(); for (int i = 1; i <= numOfMessages; i++) { // try adding a message to the batch if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}"))) { // if it is too large for the batch throw new Exception($"The message {i} is too large to fit in the batch."); } } try { // Use the producer client to send the batch of messages to the Service Bus queue await sender.SendMessagesAsync(messageBatch); Console.WriteLine($"A batch of {numOfMessages} messages has been published to the queue."); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await sender.DisposeAsync(); await client.DisposeAsync(); } Console.WriteLine("Press any key to end the application"); Console.ReadKey();
- 使用
生成项目并确保没有错误。
运行程序并等待出现确认消息。
A batch of 3 messages has been published to the queue
重要
在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟才能完成。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。
在 Azure 门户中按照以下步骤操作:
请注意以下值:
- 队列的活动消息计数值现在为 3。 每次运行此发件人应用而不检索消息时,该值会增加 3。
- 每次应用向队列添加消息时,队列的当前大小都会递增。
- 在消息图表中的底部指标部分中,可以看到队列有三条传入的消息。
从队列接收消息
在本部分中,你将创建一个 .NET 控制台应用程序,用于接收来自队列的消息。
注意
本快速入门分步介绍了如何实施一个方案,将一批消息发送到服务总线队列并接收这些消息。 有关其他和高级方案的更多示例,请参阅 GitHub 上的服务总线 .NET 示例。
为接收器创建项目
- 在“解决方案资源管理器”窗口中,右键单击 ServiceBusQueueQuickStart 解决方案,指向添加,然后选择新建项目。
- 选择控制台应用程序,然后选择下一步。
- 输入 QueueReceiver 作为项目名称,然后选择创建。
- 在解决方案资源管理器窗口中,右键单击 QueueReceiver,然后选择设为启动项目。
向项目添加 NuGet 包
在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”。
选择“QueueReceiver”作为“默认项目”。
运行以下命令以安装 Azure.Messaging.ServiceBus NuGet 包。
Install-Package Azure.Messaging.ServiceBus
运行以下命令以安装 Azure.Identity NuGet 包。
Install-Package Azure.Identity
添加从队列接收消息的代码
在本部分中,你将添加从队列检索消息的代码。
在
Program
类中,添加以下代码:using System.Threading.Tasks; using Azure.Identity; using Azure.Messaging.ServiceBus; // the client that owns the connection and can be used to create senders and receivers ServiceBusClient client; // the processor that reads and processes messages from the queue ServiceBusProcessor processor;
将以下方法追加到
Program
类的末尾。// handle received messages async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); Console.WriteLine($"Received: {body}"); // complete the message. message is deleted from the queue. await args.CompleteMessageAsync(args.Message); } // handle any errors when receiving messages Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; }
将以下代码追加到
Program
类的末尾。 以下部分概述了重要步骤,并在代码注释中提供了其他信息。- 使用
DefaultAzureCredential
对象创建 ServiceBusClient 对象。DefaultAzureCredential
将自动发现并使用你的 Visual Studio 登录凭据向 Azure 服务总线进行身份验证。 - 对
ServiceBusClient
对象调用 CreateProcessor 方法,从而为指定的“服务总线”队列创建 ServiceBusProcessor 对象。 - 为 ServiceBusProcessor 对象的 ProcessMessageAsync 和 ProcessErrorAsync 事件指定处理程序。
- 通过对
ServiceBusProcessor
对象调用 StartProcessingAsync 以开始处理消息。 - 当用户按下某个键结束处理时,将对
ServiceBusProcessor
对象调用 StopProcessingAsync。
重要
使用服务总线命名空间和队列的名称更新代码片段中的占位符值(
<NAMESPACE-NAME>
和<QUEUE-NAME>
)。// The Service Bus client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when messages are being published or read // regularly. // // Set the transport type to AmqpWebSockets so that the ServiceBusClient uses port 443. // If you use the default AmqpTcp, make sure that ports 5671 and 5672 are open. // TODO: Replace the <NAMESPACE-NAME> placeholder var clientOptions = new ServiceBusClientOptions() { TransportType = ServiceBusTransportType.AmqpWebSockets }; client = new ServiceBusClient( "<NAMESPACE-NAME>.servicebus.windows.net", new DefaultAzureCredential(), clientOptions); // create a processor that we can use to process the messages // TODO: Replace the <QUEUE-NAME> placeholder processor = client.CreateProcessor("<QUEUE-NAME>", new ServiceBusProcessorOptions()); try { // add handler to process messages processor.ProcessMessageAsync += MessageHandler; // add handler to process any errors processor.ProcessErrorAsync += ErrorHandler; // start processing await processor.StartProcessingAsync(); Console.WriteLine("Wait for a minute and then press any key to end the processing"); Console.ReadKey(); // stop processing Console.WriteLine("\nStopping the receiver..."); await processor.StopProcessingAsync(); Console.WriteLine("Stopped receiving messages"); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await processor.DisposeAsync(); await client.DisposeAsync(); }
- 使用
完成的
Program
类应与以下代码匹配:using System.Threading.Tasks; using Azure.Messaging.ServiceBus; using Azure.Identity; // the client that owns the connection and can be used to create senders and receivers ServiceBusClient client; // the processor that reads and processes messages from the queue ServiceBusProcessor processor; // The Service Bus client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when messages are being published or read // regularly. // // Set the transport type to AmqpWebSockets so that the ServiceBusClient uses port 443. // If you use the default AmqpTcp, make sure that ports 5671 and 5672 are open. // TODO: Replace the <NAMESPACE-NAME> and <QUEUE-NAME> placeholders var clientOptions = new ServiceBusClientOptions() { TransportType = ServiceBusTransportType.AmqpWebSockets }; client = new ServiceBusClient("<NAMESPACE-NAME>.servicebus.windows.net", new DefaultAzureCredential(), clientOptions); // create a processor that we can use to process the messages // TODO: Replace the <QUEUE-NAME> placeholder processor = client.CreateProcessor("<QUEUE-NAME>", new ServiceBusProcessorOptions()); try { // add handler to process messages processor.ProcessMessageAsync += MessageHandler; // add handler to process any errors processor.ProcessErrorAsync += ErrorHandler; // start processing await processor.StartProcessingAsync(); Console.WriteLine("Wait for a minute and then press any key to end the processing"); Console.ReadKey(); // stop processing Console.WriteLine("\nStopping the receiver..."); await processor.StopProcessingAsync(); Console.WriteLine("Stopped receiving messages"); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await processor.DisposeAsync(); await client.DisposeAsync(); } // handle received messages async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); Console.WriteLine($"Received: {body}"); // complete the message. message is deleted from the queue. await args.CompleteMessageAsync(args.Message); } // handle any errors when receiving messages Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; }
生成项目并确保没有错误。
运行接收器应用程序。 你应该会看到接收的消息。 按任意键来停止使用接收器和应用程序。
Wait for a minute and then press any key to end the processing Received: Message 1 Received: Message 2 Received: Message 3 Stopping the receiver... Stopped receiving messages
再次检查门户。 等待几分钟,如果未看到活动消息的
0
,请刷新页面。
清理资源
导航到 Azure 门户中的服务总线命名空间,然后在 Azure 门户上选择“删除”以删除命名空间及其中的队列。
另请参阅
请参阅以下文档和示例: