MSMQ 3.0 中的病毒消息处理
本示例演示如何在服务中执行病毒消息处理。本示例基于已经过事务处理的 MSMQ 绑定示例。其中使用到了 netMsmqBinding。该服务是自承载控制台应用程序,通过它可以观察服务接收排队消息。
在排队通信中,客户端使用队列与服务进行通信。更确切地说,客户端向队列发送消息。服务从队列接收消息。因此不必同时运行服务和客户端便可使用队列进行通信。
病毒消息是一类当服务读取消息时不能对消息进行处理,并因此终止从中读取消息的事务时从队列重复读取的消息。在这种情况下,将再次重试消息。如果消息出现问题,这种情况在理论上将永远继续下去。注意,这种情况仅在您使用事务从队列中读取消息并调用服务操作时发生。
根据 MSMQ 版本,NetMsmqBinding 支持对病毒消息进行有限检测和完全检测。在已经将消息检测为病毒后,可以通过多种方式对消息进行处理。
本示例演示 Windows Server 2003 和 Windows XP 平台上提供的有限病毒功能和 Windows Vista 上提供的完全病毒功能。这两个示例的目的是将病毒消息从一个队列移出到另一个队列,病毒消息服务随后即可为后者提供服务。
MSMQ v3.0(Windows Server 2003 和 Windows XP)病毒处理示例
MSMQ 3.0 版不提供病毒子队列支持。因此,在此示例中,我们创建一个用于保存病毒消息的队列。这是 MSMQ 3.0 版中处理病毒消息以阻止数据丢失的首选方法。
Windows Server 2003 和 Windows XP 中的病毒消息检测限制为配置单个属性 ReceiveRetryCount。ReceiveRetryCount 是可以从队列中重复读取消息的次数。Windows Communication Foundation (WCF) 在内存中保存此计数。达到该计数后(即,消息已中毒),我们立即转换到如何处理病毒消息。绑定上的枚举 ReceiveErrorHandling 属性提供它可以采用的可能值。枚举值为:
- Fault(默认值):将侦听器和服务主机视为出现故障。
- Drop:将删除消息。
- Move:将消息移到病毒消息子队列中。此值仅在 Windows Vista 中可用。
- Reject:要通过将消息发送回发送方死信队列来拒绝消息。此值仅在 Windows Vista 中可用。
当使用 MSMQ 3.0 版时,在这些值中仅 Fault 和 Drop 是有效值。本示例演示如何使用 MSMQ 3.0 版处理病毒消息。本示例也可以运行在 MSMQ 4.0 版中,运行方式与使用 MSMQ 3.0 版时完全一样。
服务协定是 IOrderProcessor
,它定义了适合与队列一起使用的单向服务。
[ServiceContract(Namespace="http://Microsoft.ServiceModel.Samples")]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void SubmitPurchaseOrder(PurchaseOrder po);
}
该服务操作显示一条指示它正在处理订单的消息。若要演示病毒消息功能,SubmitPurchaseOrder
服务操作将引发异常,用于回滚服务的某些随机调用上的事务。这将导致消息被放回队列中。并最终将消息标记为病毒。该配置在病毒消息上设置为 fault。一旦消息中毒,队列通道将引发 MsmqPoisonMessageException 并将 ServiceHost 视为故障,其中该异常包含病毒消息的 MessageLookupId。错误处理程序附加到此异常的监视器上并采取相应的措施。为了添加错误处理程序,我们将创建 PoisonErrorBehaviorAttribute
并使用此行为对 OrderProcessorService
进行批注。
// Service class that implements the service contract.
// Added code to write output to the console window.
[PoisonErrorBehavior]
public class OrderProcessorService : IOrderProcessor
{
static Random r = new Random(137);
public static string QueueName;
public static string PoisonQueueName;
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SubmitPurchaseOrder(PurchaseOrder po)
{
int randomNumber = r.Next(10);
if (randomNumber % 2 == 0)
{
Orders.Add(po);
Console.WriteLine("Processing {0} ", po);
}
else
{
Console.WriteLine("Aborting transaction, cannot process purchase order: " + po.PONumber);
Console.WriteLine();
throw new Exception("Cannot process purchase order: " + po.PONumber);
}
}
public static void StartThreadProc(object stateInfo)
{
StartService();
}
public static void StartService()
{
// Get MSMQ queue name from app settings in configuration
QueueName = ConfigurationManager.AppSettings["queueName"];
// Get MSMQ queue name for the final poison queue
PoisonQueueName = ConfigurationManager.AppSettings["poisonQueueName"];
// Create the transacted MSMQ queue if necessary.
if (!System.Messaging.MessageQueue.Exists(QueueName))
System.Messaging.MessageQueue.Create(QueueName, true);
// Create the transacted poison message MSMQ queue if necessary.
if (!System.Messaging.MessageQueue.Exists(PoisonQueueName))
System.Messaging.MessageQueue.Create(PoisonQueueName, true);
// Get the base address that is used to listen for WS-MetaDataExchange requests
// This is useful to generate a proxy for the client
string baseAddress = ConfigurationManager.AppSettings["baseAddress"];
// Create a ServiceHost for the OrderProcessorService type.
ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService), new Uri(baseAddress));
// Hook on to the service host faulted events
serviceHost.Faulted += new EventHandler(OnServiceFaulted);
// Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open();
}
public static void OnServiceFaulted(object sender, EventArgs e)
{
Console.WriteLine("Service Faulted");
}
// Host the service within this EXE console application.
public static void Main()
{
OrderProcessorService.StartService();
// The service can now be accessed.
Console.WriteLine("The service is ready.");
Console.WriteLine("Press <ENTER> to stop the service");
Console.ReadLine();
Console.WriteLine("Exiting service");
}
}
PoisonErrorBehaviorAttribute
安装 PoisonErrorHandler
。PoisonErrorHandler
是 IErrorHandler 的一个实现。每当系统中引发异常时都将调用 IErrorHandler。在我们的实现中,我们将查找指示已发现病毒消息的 MsmqPoisonMessageException。我们从异常中检索 MSMQ 消息查找 ID 并使用 System.Messaging API 从队列中接收病毒消息,然后将病毒消息移到特定的病毒消息队列,供以后处理。因为消息可能在 ServiceModel 事务中仍处于锁定状态,我们将等待以得知读取是否成功并在不成功时再次重试。一旦将消息移到其他队列,就可以使用队列中的其他消息,这时将创建新的 ServiceHost 并打开该 ServiceHost 用于处理。下面的代码示例演示此行为:
public class PoisonErrorHandler : IErrorHandler
{
static WaitCallback orderProcessingCallback = new WaitCallback(OrderProcessorService.StartThreadProc);
public void ProvideFault(Exception error, MessageVersion version, ref Message fault)
{
// no-op -we are not interested in this.
}
// Handle poison message exception by moving the offending message out of the way for regular processing to go on.
public bool HandleError(Exception error)
{
MsmqPoisonMessageException poisonException = error as MsmqPoisonMessageException;
if (null != poisonException)
{
long lookupId = poisonException.MessageLookupId;
Console.WriteLine(" Poisoned message -message look up id = {0}", lookupId);
// Get MSMQ queue name from app settings in configuration.
System.Messaging.MessageQueue orderQueue = new System.Messaging.MessageQueue(OrderProcessorService.QueueName);
System.Messaging.MessageQueue poisonMessageQueue = new System.Messaging.MessageQueue(OrderProcessorService.PoisonQueueName);
System.Messaging.Message message = null;
// Use a new transaction scope to remove the message from the main application queue and add it to the poison queue.
// The poison message service processes messages from the poison queue.
using(TransactionScope txScope= new TransactionScope(TransactionScopeOption.RequiresNew))
{
int retryCount = 0;
while(retryCount < 3)
{
retryCount++;
try
{
// Look up the poison message using the look up id.
message = orderQueue.ReceiveByLookupId(lookupId);
if(message != null)
{
// Send the message to the poison message queue.
poisonMessageQueue.Send(message, System.Messaging.MessageQueueTransactionType.Automatic);
// complete transaction scope
txScope.Complete();
Console.WriteLine("Moved poisoned message with look up id: {0} to poison queue: {1} ", lookupId, OrderProcessorService.PoisonQueueName);
break;
}
}
catch(InvalidOperationException )
{
//Code for the case when the message may still not be available in the queue because of a race condition in transaction or
//another node in the farm may actually have taken the message.
if (retryCount < 3)
{
Console.WriteLine("Trying to move poison message but message is not available, sleeping for 10 seconds before retrying");
Thread.Sleep(TimeSpan.FromSeconds(10));
}
else
{
Console.WriteLine("Giving up on trying to move the message");
}
}
}
}
// Restart the service host.
Console.WriteLine();
Console.WriteLine("Restarting the service to process rest of the messages in the queue");
Console.WriteLine("Press <ENTER> to stop the service");
ThreadPool.QueueUserWorkItem(orderProcessingCallback);
return true;
}
return false;
}
}
服务配置包括下面的病毒消息属性:receiveRetryCount 和 receiveErrorHandling,如下面的配置文件所示。当运行在 Windows Server 2003 和 Windows XP 上时,将忽略属性 maxRetryCycles 和 retryCycleDelay。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<!-- Use appSetting to configure the MSMQ queue name. -->
<add key="queueName" value=".\private$\ServiceModelSamplesPoison" />
<add key="poisonQueueName" value=".\private$\OrderProcessorPoisonQueue" />
<add key="baseAddress" value="https://localhost:8000/orderProcessor/poisonSample"/>
</appSettings>
<system.serviceModel>
<services>
<service
name="Microsoft.ServiceModel.Samples.OrderProcessorService">
<!-- Define NetMsmqEndpoint -->
<endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison"
binding="netMsmqBinding"
bindingConfiguration="PoisonBinding"
contract="Microsoft.ServiceModel.Samples.IOrderProcessor" />
</service>
</services>
<bindings>
<netMsmqBinding>
<binding name="PoisonBinding"
receiveRetryCount="0"
maxRetryCycles="1"
retryCycleDelay="00:00:05"
receiveErrorHandling="Fault">
</binding>
</netMsmqBinding>
</bindings>
</system.serviceModel>
</configuration>
处理病毒消息队列中的消息
病毒消息服务从最终病毒消息队列中读取消息并处理这些消息。
病毒消息队列中的消息是指发送到正在处理消息的服务的消息,这可能不同于病毒消息服务终结点。因此,当病毒消息服务从队列中读取消息时,WCF 通道层可发现与终结点的不匹配之处,并且不会调度消息。在这种情况下,消息将发送到订单处理服务,但将由病毒消息服务接收。如果无论消息是否发送到不同的终结点都要继续接收消息,则我们必须添加 ServiceBehavior
来筛选地址,其中匹配标准是要匹配消息发送到的任何服务终结点。这是成功处理从病毒消息队列中读取的消息所必需的。
病毒消息服务实现本身非常类似于服务实现。它实现协定并处理订单。代码示例如下所示。
// Service class that implements the service contract.
// Added code to write output to the console window.
[ServiceBehavior(AddressFilterMode=AddressFilterMode.Any)]
public class OrderProcessorService : IOrderProcessor
{
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SubmitPurchaseOrder(PurchaseOrder po)
{
Orders.Add(po);
Console.WriteLine("Processing {0} ", po);
}
public static void OnServiceFaulted(object sender, EventArgs e)
{
Console.WriteLine("Service Faulted...exiting app");
Environment.Exit(1);
}
// Host the service within this EXE console application.
public static void Main()
{
// Create a ServiceHost for the OrderProcessorService type.
ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService));
// Hook on to the service host faulted events.
serviceHost.Faulted += new EventHandler(OnServiceFaulted);
serviceHost.Open();
// The service can now be accessed.
Console.WriteLine("The poison message service is ready.");
Console.WriteLine("Press <ENTER> to terminate service.");
Console.WriteLine();
Console.ReadLine();
// Close the ServiceHostBase to shutdown the service.
if(serviceHost.State != CommunicationState.Faulted)
{
serviceHost.Close();
}
}
与从订单队列中读取消息的订单处理服务不同,病毒消息服务从病毒子队列中读取消息。病毒队列是主队列的子队列,其名称为“poison”,由 MSMQ 自动生成。若要访问该队列,请提供主队列名称,后接“;”并在子队列为“poison”的情况下提供子队列名称,如下面的示例配置中所示。
提示
在 MSMQ 3.0 版的示例中,病毒队列名称不是子队列,而是将消息移动到的队列。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<services>
<service name="Microsoft.ServiceModel.Samples.OrderProcessorService">
<!-- Define NetMsmqEndpoint -->
<endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison;poison"
binding="netMsmqBinding"
contract="Microsoft.ServiceModel.Samples.IOrderProcessor" >
</endpoint>
</service>
</services>
</system.serviceModel>
</configuration>
当运行示例时,客户端、服务和病毒消息服务活动将显示在控制台窗口中。您可以看到服务从客户端接收消息。在每个控制台窗口中按 Enter 可以关闭服务。
服务开始运行、处理订单并随机开始终止处理。如果消息指示服务已经处理完订单,则可以再次运行客户端来发送其他消息,直到您看到服务确实已经终止某条消息。根据配置的病毒设置,在将消息移到最终病毒队列中之前将再次重试处理消息。
The service is ready.
Press <ENTER> to terminate service.
Processing Purchase Order: 0f063b71-93e0-42a1-aa3b-bca6c7a89546
Customer: somecustomer.com
OrderDetails
Order LineItem: 54 of Blue Widget @unit price: $29.99
Order LineItem: 890 of Red Widget @unit price: $45.89
Total cost of this order: $42461.56
Order status: Pending
Processing Purchase Order: 5ef9a4fa-5a30-4175-b455-2fb1396095fa
Customer: somecustomer.com
OrderDetails
Order LineItem: 54 of Blue Widget @unit price: $29.99
Order LineItem: 890 of Red Widget @unit price: $45.89
Total cost of this order: $42461.56
Order status: Pending
Aborting transaction, cannot process purchase order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89
启动病毒消息服务以从病毒队列中读取病毒消息。在本示例中,病毒消息服务读取消息并处理这些消息。您可以看到病毒消息服务读取已终止并已中毒的采购订单。
The service is ready.
Press <ENTER> to terminate service.
Processing Purchase Order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89
Customer: somecustomer.com
OrderDetails
Order LineItem: 54 of Blue Widget @unit price: $29.99
Order LineItem: 890 of Red Widget @unit price: $45.89
Total cost of this order: $42461.56
Order status: Pending
设置、生成和运行示例
若要生成 C# 或 Visual Basic .NET 版本的解决方案,请按照生成 Windows Communication Foundation 示例中的说明进行操作。
若要用单一计算机配置或跨计算机配置来运行示例,请更改队列名称,以映射实际的主机名而不是本地主机,并按照运行 Windows Communication Foundation 示例中的说明操作。
默认情况下对 netMsmqBinding 绑定传输启用了安全性。MsmqAuthenticationMode 和 MsmqProtectionLevel 这两个属性共同确定了传输安全性的类型。默认情况下,身份验证模式设置为 Windows,保护级别设置为 Sign。MSMQ 必须是域的成员才可以提供身份验证和签名功能。如果在不是域成员的计算机上运行此示例,则会接收以下错误:“用户的内部消息队列证书不存在”。
在加入到工作组的计算机上运行此示例
如果计算机不是域成员,请将身份验证模式和保护级别设置为
None
以禁用传输安全性,如下面的示例配置所示:<bindings> <netMsmqBinding> <binding name="TransactedBinding"> <security mode="None"/> </binding> </netMsmqBinding> </bindings>
确保通过设置终结点的 bindingConfiguration 属性将终结点与绑定关联。
确保在运行示例前更改 PoisonMessageServer、服务器和客户端上的配置。
提示
将 security mode 设置为 None 等效于将 MsmqAuthenticationMode、MsmqProtectionLevel 和 Message 安全设置为 None。
Send comments about this topic to Microsoft.
© 2007 Microsoft Corporation. All rights reserved.