处理有害消息

本主题介绍的是使用 Service Broker 的应用程序可在不依赖有害消息自动检测的情况下检测有害消息并将有害消息从队列中删除的一种方式。

Service Broker 提供了有害消息自动检测功能。如果从队列接收消息的事务回滚了五次,则有害消息自动检测会将该队列状态设为 OFF。此功能可防止发生应用程序不能以编程方式检测的灾难性故障。不过,应用程序不应依赖此功能进行常规处理。因为有害消息自动检测会停止队列,所以在删除有害消息之前,此功能实际上会暂停应用程序的所有处理工作。应用程序应改为尝试使用应用程序逻辑来检测和删除有害消息。

本节中概述的策略假定:如果一个消息已失败一定次数,则应将其删除。对于多数应用程序而言,此假定是有效的。不过,在应用程序中使用此策略之前,请考虑以下问题:

  • 对于您的应用程序,失败计数可靠吗?根据应用程序的具体情况,消息多次失败可能是正常的。例如,在订单输入应用程序中,与添加新客户记录的服务相比,处理订单的服务的处理时间可能要短一些。在这种情况下,新客户的订单未能即刻得到处理可能属于正常情况。在确定一个消息是否为有害消息时,应用程序需要考虑延迟因素。相关服务可能需要允许数次失败后,才将该消息删除。

  • 您的应用程序能否通过快速可靠地检查消息内容来检测该消息是否永远不会成功?如果能做到此点,这种策略要优于对程序处理该消息失败的次数进行计数。例如,无法处理未包含雇员姓名或雇员 ID 号的费用报表。在这种情况下,如果程序能够即刻对因存在错误而无法处理的消息做出响应,而不是尝试处理该消息,则该程序的效率可能会高得多。同时还请考虑其他验证。例如,如果存在 ID 号,但是 ID 号超出了指定的数字范围(例如为负数),则应用程序可以立即结束会话。

  • 是否一旦失败就应删除消息?如果应用程序要处理大量的消息,而每条消息的使用时间都较为有限,则立即删除任何可导致操作失败的消息可能是最为有效的方式。例如,如果某消息用于提供来自目标服务的进度报告,则起始服务可以选择丢弃空的进度报告,其方法是:提交接收结果,但不处理该消息。在这种情况下,会话将继续。

在决定应用程序以何种方式处理有害消息时,请考虑以下问题:

  • 应用程序是否应记录消息的失败结果及其内容?在多数情况下,不需要这么做。不过,对于某些应用程序,可能需要保留消息的内容。

  • 应用程序是否应记录有关失败的其他信息?在某些情况下,您可能需要跟踪会话的其他信息。例如,您可能使用目录视图 sys.conversation_endpoints 来标识生成有害消息的远程 Broker 实例。

  • 应用程序是否应结束发生错误的会话,或者相应服务的约定是否允许应用程序在不关闭会话的情况下指示错误?对于多数服务,接收有害消息即意味着在约定中指定的任务无法完成。在这种情况下,应用程序会结束发生错误的会话。在其他情况下,即使一个消息失败,会话可能仍会继续。例如,从仓库接收库存数据的服务可能会偶尔收到带有未知部件号的消息。该服务可以将该消息保存在一个单独的表中以待操作员稍后检查,而不是结束会话。

示例:检测有害消息

此 Transact-SQL 示例显示的是一个简单的无状态服务,该服务包含处理有害消息的逻辑。在存储过程接收消息之前,该过程会保存相应事务。如果该过程无法处理某消息,它会将该事务回滚到相应的保存点。部分回滚功能会将该消息返回队列,同时继续持有该消息所属的会话组的锁。因为该程序会继续持有该会话组的锁,所以它可以更新用于维护失败消息列表的表,从而排除了其他队列读取器可能处理该消息的风险。

以下示例定义了该应用程序的激活存储过程:

CREATE PROCEDURE ProcessExpenseReport
AS
BEGIN
  WHILE (1 = 1)
    BEGIN
      BEGIN TRANSACTION ;
      DECLARE @conversationHandle UNIQUEIDENTIFIER ;
      DECLARE @messageBody VARBINARY(MAX) ;
      DECLARE @messageTypeName NVARCHAR(256) ;

      SAVE TRANSACTION UndoReceive ;

        WAITFOR ( 
                  RECEIVE TOP(1)
                    @messageTypeName = message_type_name,
                    @messageBody = message_body,
                    @conversationHandle = conversation_handle
                    FROM ExpenseQueue
                 ), TIMEOUT 500 ;

        IF @@ROWCOUNT = 0
        BEGIN
          ROLLBACK TRANSACTION ;
          BREAK ;
        END ;

        -- Typical message processing loop: dispatch to a stored
        -- procedure based on the message type name.  End conversation
        -- with an error for unknown message types.

        -- Process expense report messages. If processing fails,
        -- roll back to the save point and track the failed message.

        IF (@messageTypeName =
              '//Adventure-Works.com/AccountsPayable/ExpenseReport')
          BEGIN
            DECLARE @expenseReport NVARCHAR(MAX) ;
            SET @expenseReport = CAST(@messageBody AS NVARCHAR(MAX)) ;
            EXEC AdventureWorks.dbo.AddExpenseReport
              @report = @expenseReport ;
            IF @@ERROR <> 0
             BEGIN
               ROLLBACK TRANSACTION UndoReceive ;
               EXEC TrackMessage @conversationHandle ;
             END ;
            ELSE
             BEGIN
               EXEC AdventureWorks.dbo.ClearMessageTracking
                 @conversationHandle ;
             END ;
           END ;
        ELSE

        -- For error messages and end dialog messages, end the
        -- conversation.

        IF (@messageTypeName =
              'https://schemas.microsoft.com/SQL/ServiceBroker/Error' OR
             @messageTypeName =
              'https://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
          BEGIN
            END CONVERSATION @conversationHandle ;
            EXEC dbo.ClearMessageTracking @conversationHandle ;
          END ;


         COMMIT TRANSACTION ;
    END ;
END ;

存储过程 TrackMessage 用于跟踪一条消息已失败的次数。如果消息以前未失败过,该过程会将一个针对该消息的新计数器插入到表 ExpenseServiceFailedMessages 中。否则,该过程会检查计数器以查看该消息已失败的次数。在计数器没有达到预定义数量之前,该过程会递增计数器。在计数器超过预定义的数目后,该过程会结束发生错误的会话,并从表中删除该会话的计数器。

CREATE PROCEDURE TrackMessage
@conversationHandle uniqueidentifier
AS
BEGIN
  IF @conversationHandle IS NULL
    RETURN ;

  DECLARE @count INT ;
  SET @count = NULL ;
  SET @count = (SELECT count FROM dbo.ExpenseServiceFailedMessages
                  WHERE conversation_handle = @conversationHandle) ;

  IF @count IS NULL
    BEGIN
      INSERT INTO dbo.ExpenseServiceFailedMessages
        (count, conversation_handle)
        VALUES (1, @conversationHandle) ;
    END ;
  IF @count > 3
    BEGIN
      EXEC dbo.ClearMessageTracking @conversationHandle ;
      END CONVERSATION @conversationHandle
        WITH ERROR = 500
        DESCRIPTION = 'Unable to process message.' ;
    END ;
  ELSE
    BEGIN
      UPDATE dbo.ExpenseServiceFailedMessages
        SET count=count+1
        WHERE conversation_handle = @conversationHandle ;
    END ;
END ;
GO

表 ExpenseServiceFailedMessages 的定义仅包含 conversation_handle 列和 count 列,如下例所示:

CREATE TABLE ExpenseServiceFailedMessages (
  conversation_handle uniqueidentifier PRIMARY KEY,
  count smallint
) ;

过程 ClearMessageTracking 将从表 ExpenseServiceFailedMessages 中删除会话的计数器,如下例所示:

CREATE PROCEDURE ClearMessageTracking
  @conversationHandle uniqueidentifier
AS
BEGIN
   DELETE FROM dbo.ExpenseServiceFailedMessages
     WHERE conversation_handle = @conversationHandle ;
END ;
GO

此处显示的策略已经过特意简化。在您根据需要生成应用程序时,可以本主题中介绍的内容作为基础。例如,如果您的应用程序需要维护状态,则将失败消息的跟踪信息纳入该应用程序的状态表中会更为高效。

以上介绍的存储过程不会处理将导致事务失败的错误。如果服务接收了导致整个事务失败的消息,此事务将回滚。如果这种情况发生了五次,有害消息自动检测会将队列状态设为 OFF。在这种情况下,必须通过其他应用程序或者由管理员将有害消息删除。

如果您认为对消息执行的处理可能导致事务失败,则可以使用 TRY 和 CATCH 语句处理该错误。有关错误处理的详细信息,请参阅 处理数据库引擎错误