MSMQ 4.0 中的病毒消息处理

MSMQ4 示例演示如何在服务中执行有害消息处理。 该示例基于事务处理 MSMQ 绑定示例。 此示例使用 netMsmqBinding. 此服务是自承载控制台应用程序,通过它可以观察服务接收排队消息。

在排队通信中,客户端使用队列与服务通信。 更确切地说,客户端将消息发送到队列。 服务从队列接收消息。 因此,服务与客户端不必同时运行,才能使用队列进行通信。

病毒消息是一类当服务读取消息时不能对消息进行处理,并因此终止从中读取消息的事务时从队列重复读取的消息。 在这种情况下,消息将会被再次尝试发送。 如果消息有问题,这在理论上可以永远继续下去。 这种情况仅在你使用事务从队列中读取消息并调用服务操作时发生。

根据 MSMQ 的版本,NetMsmqBinding 支持从有限检测到完全检测有害消息。 消息被检测为中毒后,可以通过多种方式进行处理。 同样,根据 MSMQ 版本,NetMsmqBinding 支持对病毒消息进行有限处理和完全处理。

此示例演示了 Windows Server 2003 和 Windows XP 平台上提供的有限有害设施以及 Windows Vista 上提供的完整有害设施。 在这两个示例中,目标是将病毒消息从队列移动到另一个队列。 有害消息服务随后即可为该队列提供服务。

MSMQ v4.0 病毒处理示例

在 Windows Vista 中,MSMQ 提供了可用于存储病毒消息的有害子队列设施。 此示例演示了使用 Windows Vista 处理有害消息的最佳做法。

Windows Vista 中的病毒消息检测非常复杂。 有 3 个属性可帮助检测。 给定 ReceiveRetryCount 消息从队列中重新读取并调度到应用程序进行处理的次数。 当由于某一消息无法调度到应用程序或应用程序在服务操作中回滚事务,该消息返回到队列中时,即会从队列中重新读取该消息。 MaxRetryCycles 是消息移动到重试队列的次数。 当达到 ReceiveRetryCount 时,消息将被移动到重试队列。 该属性 RetryCycleDelay 是消息从重试队列移回主队列之后的时间延迟。 重置 ReceiveRetryCount 为 0。 再次尝试消息。 如果所有读取消息的尝试都失败,则消息将标记为有害。

消息标记为有害后,将根据枚举中的 ReceiveErrorHandling 设置处理该消息。 若要重申可能的值,

  • Fault(默认值):将侦听器和服务主机视为出现故障。

  • 放置:将放置消息。

  • Move:将消息移到有害消息子队列中。 此值仅在 Windows Vista 上可用。

  • Reject:要通过将消息发送回发送方死信队列来拒绝消息。 此值仅在 Windows Vista 上可用。

示例演示了如何使用 Move 处理病毒消息。 Move 可使消息移动到有害子队列中。

服务协定是IOrderProcessor,它定义了一种适合用于队列的单向服务。

[ServiceContract(Namespace="http://Microsoft.ServiceModel.Samples")]
public interface IOrderProcessor
{
    [OperationContract(IsOneWay = true)]
    void SubmitPurchaseOrder(PurchaseOrder po);
}

服务操作显示一条消息,正在处理订单。 为了演示有害消息功能,SubmitPurchaseOrder 服务操作将引发异常,以便在任意一次调用服务时回滚事务。 这会导致消息被放回队列中。 最终,消息被标记为有害。 配置设置为将病毒消息移动到病毒子队列。

// Service class that implements the service contract.
// Added code to write output to the console window.
public class OrderProcessorService : IOrderProcessor
{
    static Random r = new Random(137);

    [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
    public void SubmitPurchaseOrder(PurchaseOrder po)
    {

        int randomNumber = r.Next(10);

        if (randomNumber % 2 == 0)
        {
            Orders.Add(po);
            Console.WriteLine("Processing {0} ", po);
        }
        else
        {
            Console.WriteLine("Aborting transaction, cannot process purchase order: " + po.PONumber);
            Console.WriteLine();
            throw new Exception("Cannot process purchase order: " + po.PONumber);
        }
    }

    public static void OnServiceFaulted(object sender, EventArgs e)
    {
        Console.WriteLine("Service Faulted");
    }

    // Host the service within this EXE console application.
    public static void Main()
    {
        // Get MSMQ queue name from app settings in configuration.
        string queueName = ConfigurationManager.AppSettings["queueName"];

        // Create the transacted MSMQ queue if necessary.
        if (!System.Messaging.MessageQueue.Exists(queueName))
            System.Messaging.MessageQueue.Create(queueName, true);

        // Get the base address that is used to listen for WS-MetaDataExchange requests.
        // This is useful to generate a proxy for the client.
        string baseAddress = ConfigurationManager.AppSettings["baseAddress"];

        // Create a ServiceHost for the OrderProcessorService type.
        ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService), new Uri(baseAddress));

        // Hook on to the service host faulted events.
        serviceHost.Faulted += new EventHandler(OnServiceFaulted);

        // Open the ServiceHostBase to create listeners and start listening for messages.
        serviceHost.Open();

        // The service can now be accessed.
        Console.WriteLine("The service is ready.");
        Console.WriteLine("Press <ENTER> to terminate service.");
        Console.WriteLine();
        Console.ReadLine();

        if(serviceHost.State != CommunicationState.Faulted) {
            serviceHost.Close();
        }

    }
}

服务配置包括以下病毒消息属性:receiveRetryCount、、maxRetryCyclesretryCycleDelayreceiveErrorHandling如以下配置文件所示。

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <appSettings>
    <!-- Use appSetting to configure MSMQ queue name. -->
    <add key="queueName" value=".\private$\ServiceModelSamplesPoison" />
    <add key="baseAddress" value="http://localhost:8000/orderProcessor/poisonSample"/>
  </appSettings>
  <system.serviceModel>
    <services>
      <service
              name="Microsoft.ServiceModel.Samples.OrderProcessorService">
        <!-- Define NetMsmqEndpoint -->
        <endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison"
                  binding="netMsmqBinding"
                  bindingConfiguration="PoisonBinding"
                  contract="Microsoft.ServiceModel.Samples.IOrderProcessor" />
      </service>
    </services>

    <bindings>
      <netMsmqBinding>
        <binding name="PoisonBinding"
                 receiveRetryCount="0"
                 maxRetryCycles="1"
                 retryCycleDelay="00:00:05"
                 receiveErrorHandling="Move">
        </binding>
      </netMsmqBinding>
    </bindings>
  </system.serviceModel>
</configuration>

处理病毒消息队列中的消息

病毒消息服务从最终病毒消息队列中读取消息并处理它们。

病毒消息队列中的消息是发送到正在处理消息的服务的消息,该消息可能不同于病毒消息服务终结点。 因此,当病毒消息服务从队列中读取消息时,WCF 通道层会查找终结点中的不匹配情况,并且不会调度消息。 在这种情况下,消息将发送到订单处理服务,但将由病毒消息服务接收。 若要继续接收消息,即使消息被寻址到其他终结点,我们必须添加一个 ServiceBehavior 筛选地址,其中匹配条件是匹配消息所处理的任何服务终结点。 这是成功处理从病毒消息队列中读取的消息所必需的。

病毒消息服务实现本身与服务实现非常相似。 它实现协定并处理订单。 代码示例如下所示。

// Service class that implements the service contract.
// Added code to write output to the console window.
[ServiceBehavior(AddressFilterMode=AddressFilterMode.Any)]
public class OrderProcessorService : IOrderProcessor
{
    [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
    public void SubmitPurchaseOrder(PurchaseOrder po)
    {
        Orders.Add(po);
        Console.WriteLine("Processing {0} ", po);
    }

    public static void OnServiceFaulted(object sender, EventArgs e)
    {
        Console.WriteLine("Service Faulted...exiting app");
        Environment.Exit(1);
    }

    // Host the service within this EXE console application.
    public static void Main()
    {

        // Create a ServiceHost for the OrderProcessorService type.
        ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService));

        // Hook on to the service host faulted events.
        serviceHost.Faulted += new EventHandler(OnServiceFaulted);

        serviceHost.Open();

        // The service can now be accessed.
        Console.WriteLine("The poison message service is ready.");
        Console.WriteLine("Press <ENTER> to terminate service.");
        Console.WriteLine();
        Console.ReadLine();

        // Close the ServiceHostBase to shutdown the service.
        if(serviceHost.State != CommunicationState.Faulted)
        {
            serviceHost.Close();
        }
    }

与从订单队列读取消息的订单处理服务不同,病毒消息服务从病毒子队列中读取消息。 病毒队列是主队列的子队列,名为“病毒”,由 MSMQ 自动生成。 若要访问它,请提供主队列名称,然后是“;”和子队列名称,本例中为“poison”,如以下示例配置所示。

注释

在 MSMQ 3.0 版的示例中,病毒队列名称不是子队列,而是将消息移动到的队列。

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <system.serviceModel>
    <services>
      <service name="Microsoft.ServiceModel.Samples.OrderProcessorService">
        <!-- Define NetMsmqEndpoint -->
        <endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison;poison"
                  binding="netMsmqBinding"
                  contract="Microsoft.ServiceModel.Samples.IOrderProcessor" >
        </endpoint>
      </service>
    </services>

  </system.serviceModel>
</configuration>

运行示例时,控制台窗口中会显示客户端、服务和有害消息服务活动。 可以看到服务从客户端接收消息。 在每个控制台窗口中按 Enter 关闭服务。

服务开始运行、处理订单和随机开始终止处理。 如果消息指示它已处理订单,则可以再次运行客户端以发送另一条消息,直到看到服务实际终止消息。 根据配置的病毒设置,在将消息移动到最终病毒队列之前,会尝试一次处理消息。

The service is ready.
Press <ENTER> to terminate service.

Processing Purchase Order: 0f063b71-93e0-42a1-aa3b-bca6c7a89546
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

Processing Purchase Order: 5ef9a4fa-5a30-4175-b455-2fb1396095fa
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

Aborting transaction, cannot process purchase order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89

启动病毒消息服务,从病毒队列中读取有害消息。 在此示例中,病毒消息服务读取消息并处理它。 您可以看到病毒消息服务读取已终止并已中毒的采购订单。

The service is ready.
Press <ENTER> to terminate service.

Processing Purchase Order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89
        Customer: somecustomer.com
        OrderDetails
                Order LineItem: 54 of Blue Widget @unit price: $29.99
                Order LineItem: 890 of Red Widget @unit price: $45.89
        Total cost of this order: $42461.56
        Order status: Pending

设置、生成和运行示例

  1. 确保已为 Windows Communication Foundation 示例 执行One-Time 安装过程。

  2. 如果先运行服务,则它将检查以确保队列存在。 如果队列不存在,服务将创建一个队列。 可以先运行服务来创建队列,也可以通过 MSMQ 队列管理器创建一个队列。 按照以下步骤在 Windows 2008 中创建队列。

    1. 在 Visual Studio 2012 中打开服务器管理器。

    2. 展开“功能”选项卡

    3. 右键单击 专用消息队列,然后选择 “新建”,专用队列

    4. 选中“事务性”框

    5. 输入 ServiceModelSamplesTransacted 作为新队列的名称。

  3. 若要生成解决方案的 C# 或 Visual Basic .NET 版本,请按照 生成 Windows Communication Foundation 示例中的说明进行操作。

  4. 若要在单计算机配置或跨计算机配置中运行示例,请更改队列名称以反映实际主机名而不是 localhost,并按照 运行 Windows Communication Foundation 示例中的说明进行作。

默认情况下, netMsmqBinding 绑定传输会启用安全性。 两个属性 MsmqAuthenticationMode ,以及 MsmqProtectionLevel共同确定传输安全性的类型。 默认情况下,身份验证模式设置为 Windows ,保护级别设置为 Sign。 要使 MSMQ 提供身份验证和签名功能,它必须是域的一部分。 如果在不属于域的计算机上运行此示例,则会收到以下错误:“用户的内部消息队列证书不存在”。

在加入到工作组的计算机上运行此示例

  1. 如果计算机不是域的一部分,请将身份验证模式和保护级别设置为 None 关闭传输安全性,如以下示例配置中所示:

    <bindings>
        <netMsmqBinding>
            <binding name="TransactedBinding">
                <security mode="None"/>
            </binding>
        </netMsmqBinding>
    </bindings>
    

    通过设置终结点的 bindingConfiguration 属性,确保终结点与绑定相关联。

  2. 在运行示例之前,请确保在 PoisonMessageServer、服务器和客户端上更改配置。

    注释

    security mode设置为None等同于将MsmqAuthenticationModeMsmqProtectionLevelMessage的安全性设置为None

  3. 为了使 Meta Data Exchange 正常工作,我们使用 http 绑定注册 URL。 这要求服务在提升的命令窗口中运行。 否则,您将收到如下异常:Unhandled Exception: System.ServiceModel.AddressAccessDeniedException: HTTP could not register URL http://+:8000/ServiceModelSamples/service/. Your process does not have access rights to this namespace (see https://go.microsoft.com/fwlink/?LinkId=70353 for details). ---> System.Net.HttpListenerException: Access is denied