大多数 Service Broker 应用程序都按照相同的基本步骤来接收和处理消息:
应用程序开始一个事务。
如果应用程序需维护状态,则应用程序将获取一个会话组标识符。 应用程序使用此标识符从状态表还原状态。 如果不存在任何有准备接收的消息的会话组,则应用程序将回滚事务并退出。
应用程序从队列接收一个或多个消息。 如果应用程序具有会话组标识符,则应用程序使用此会话组标识符接收相应会话组的消息。 如果不再有消息可供接收,则应用程序提交事务并返回步骤 1。
应用程序根据消息类型名称验证消息的内容。
应用程序根据消息类型名称和消息的内容处理消息。
应用程序发送处理后的所有消息。
如果应用程序需维护状态,则应用程序会更新状态表,并使用会话组标识符作为此表的主键。
应用程序返回步骤 3 以检查是否有其他消息可用。
一个应用程序的确切结构取决于该应用程序的要求和通信方式、该应用程序是目标服务还是起始服务以及 Service Broker 是否激活该应用程序。
例如,起始应用程序在发送消息之后才会开始上述步骤中概述的处理循环。 起始服务可能通过其他程序或存储过程发送消息,然后为起始服务队列使用激活存储过程。 例如,订单输入应用程序可能包含用于启动会话以输入订单的外部应用程序。 在输入订单之后,外部应用程序不需要保持运行状态。 当从订单服务返回响应时,起始服务的激活存储过程将发送订单确认。 激活存储过程还将处理由目标服务返回的任何 Service Broker 错误消息,并且发送指示无法确认订单的通知。
另外,起始应用程序可能会发送消息然后作为同一程序的一部分开始处理循环,而不是通过其他程序发送消息。 不管这些差异如何,总体步骤基本都是相同的。
如果一个应用程序需要处理同一会话组中的大量消息,该应用程序可能会对接收到的消息进行计数,并在处理一定数量的消息之后提交事务。 利用这一“计数与提交”策略,应用程序可使事务保持相对较短,并可以处理多个不同的会话组。
示例
以下 Transact-SQL 示例处理 MyServiceQueue 队列中的所有消息。 已对消息的处理进行了最大限度的简化。 如果出现 EndDialog 或 Error 消息,则此代码将结束会话。 对于其他任何消息,此代码将创建相应消息的 XML 表示形式,并且生成包含会话句柄、消息类型名称和 XML 的结果集。 如果在 500 毫秒内没有任何可用消息,则此代码将退出。
为简化起见,此脚本为每个消息都生成结果集。 如果在读取队列期间发生错误,则此脚本提交更改,而不生成任何结果。 因此,此脚本将自行删除导致错误的任何消息。
注意
由于此脚本只是显示消息,因而对于此脚本不会存在有害消息。 因此,此脚本不包含处理有害消息的代码。 编写生产应用程序时应包含处理有害消息的代码。 有关有害消息的详细信息,请参阅处理有害消息。
注意
这些示例在 SQL Server 2008 R2 (10.50.x) 上进行了验证。 建议使用 AdventureWorks 示例数据库中的 AdventureWorks2008R2
示例数据库来完成示例。
当前版本的 SQL Server 支持 SQL Server Service Broker。
USE [AdventureWorks2022];
GO
-- Process all conversation groups.
WHILE (1 = 1)
BEGIN
DECLARE
@conversation_handle UNIQUEIDENTIFIER,
@conversation_group_id UNIQUEIDENTIFIER,
@message_body XML,
@message_type_name NVARCHAR(128);
-- Begin a transaction, one per conversation group.
BEGIN TRANSACTION;
-- Get next conversation group.
WAITFOR (
GET CONVERSATION GROUP @conversation_group_id
FROM [MyServiceQueue]
),
TIMEOUT 500;
-- Restore the state for this conversation group here
-- If there are no more conversation groups, break.
IF @conversation_group_id IS NULL
BEGIN
ROLLBACK TRANSACTION;
BREAK;
END;
-- Process all messages in the conversation group.
WHILE 1 = 1
BEGIN
-- Get the next message.
RECEIVE TOP (1)
@conversation_handle = [conversation_handle],
@message_type_name = [message_type_name],
@message_body = CASE
WHEN [validation] = 'X'
THEN
CAST([message_body] AS XML)
ELSE
CAST(N'<none/>' AS XML)
END
FROM [MyServiceQueue]
WHERE CONVERSATION_GROUP_ID = @conversation_group_id;
-- If there is no message, or there is an error
-- reading from the queue, break.
IF @@ROWCOUNT = 0
OR @@ERROR <> 0
BREAK;
-- Process the message. In this case, the program ends the conversation
-- for Error and EndDialog messages. For all other messages, the program
-- produces a result set with information about the message.
SELECT
@conversation_handle,
@message_type_name,
@message_body;
-- If the message is an end dialog message or an error,
-- end the conversation. Notice that other conversations
-- in the same conversation group may still have messages
-- to process. Therefore, the program does not break after
-- ending the conversation.
IF @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
OR @message_type_name = 'https://schemas.microsoft.com/SQL/ServiceBroker/Error'
BEGIN
END CONVERSATION @conversation_handle;
END;
END; -- Process all messages in conversation group.
COMMIT TRANSACTION;
END; -- Process all conversation groups.