MSMQ 3.0 での有害メッセージ処理
このサンプルでは、サービスで有害メッセージの処理を実行する方法を示します。このサンプルは、「トランザクション MSMQ バインディング」のサンプルに基づいています。このサンプルでは、netMsmqBinding を使用しています。サービスは自己ホスト型コンソール アプリケーションであるので、キューに置かれたメッセージをサービスが受信するようすを観察できます。
キュー通信では、クライアントはサービスとの通信にキューを使用します。厳密には、クライアントはメッセージをキューに送信します。サービスは、メッセージをキューから受信します。このため、キューを使用する通信では、サービスとクライアントは同時に実行されていなくてもかまいません。
有害メッセージとは、メッセージを読み取るサービスがメッセージを処理できないためメッセージの読み取りが行われるトランザクションを終了する場合に、キューから繰り返し読み取られるメッセージのことです。そのような場合、メッセージは再試行されます。メッセージに問題がある場合、この再試行は理論上、永久に継続します。これは、キューから読み取って、サービス操作を起動するトランザクションを使用する場合にのみ発生することがある点に注意してください。
MSMQ のバージョンによって、NetMsmqBinding がサポートする有害メッセージの検出が制限されている場合と、制限されていない場合があります。メッセージが有害として検出されたら、いくつかの方法で処理できます。
このサンプルでは、Windows Server 2003 プラットフォームと Windows XP プラットフォームに用意されている、制限のある有害メッセージ機能と、Windows Vista に用意されている、制限のない有害メッセージ機能を示します。どちらのサンプルでも、目的はキューから取り出した有害メッセージを、有害メッセージ サービスによりサービスを提供可能な別のキューに移動することです。
MSMQ Version 3.0 (Windows Server 2003 および Windows XP) の有害メッセージ処理のサンプル
MSMQ バージョン 3.0 では、有害メッセージ サブキューがサポートされていません。そのため、このサンプルでは、有害メッセージを保持するキューを作成します。これは、データの損失を防ぐために、MSMQ バージョン 3.0 で有害メッセージを処理するための推奨方法です。
Windows Server 2003 および Windows XP での有害メッセージ検出は、1 つのプロパティ ReceiveRetryCount の構成に制限されます。ReceiveRetryCount は、メッセージをキューから繰り返し読み取ることができる回数です。Windows Communication Foundation (WCF) では、この回数はメモリに保持されます。この回数に達すると (つまり、メッセージが有害になると)、有害メッセージの処理方法に移行します。バインディングの列挙体 ReceiveErrorHandling プロパティに、使用可能な値が示されます。列挙値は、次のとおりです。
Fault (既定値): リスナとサービス ホストをエラーにします。
Drop: メッセージを破棄します。
Move: メッセージを有害メッセージ サブキューに移動します。この値は、Windows Vista でのみ使用できます。
Reject: メッセージを送信者の配信不能キューに戻すことにより、メッセージを拒否します。この値は、Windows Vista でのみ使用できます。
この中で、MSMQ バージョン 3.0 を使用するときは、Fault および Drop だけが有効な値です。このサンプルは、MSMQ バージョン 3.0 を使用して有害メッセージを処理する方法を示しています。また、このサンプルは MSMQ バージョン 4.0 でも実行でき、MSMQ バージョン 3.0 で実行するのと同様に機能します。
サービス コントラクトは IOrderProcessor
です。これは、キューでの使用に適した一方向サービスを定義します。
[ServiceContract(Namespace="http://Microsoft.ServiceModel.Samples")]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void SubmitPurchaseOrder(PurchaseOrder po);
}
サービス操作により、命令を処理していることを示すメッセージが表示されます。有害メッセージ機能のデモンストレーションを行うため、SubmitPurchaseOrder
サービス操作は例外をスローして、サービスのランダム起動に基づきトランザクションをロールバックします。これにより、メッセージがキューに戻ります。最終的に、メッセージは有害とマークされます。有害メッセージでは、構成はエラーと設定されます。メッセージが有害になると、キューに入れられたチャネルが、有害メッセージの MsmqPoisonMessageException を含む MessageLookupId をスローし、ServiceHost をエラーに設定します。エラー ハンドラをこの例外のモニタに付加して、適切な処理を行います。エラー ハンドラを追加するには、PoisonErrorBehaviorAttribute
を作成して、OrderProcessorService
にこの動作を付けます。
// Service class that implements the service contract.
// Added code to write output to the console window.
[PoisonErrorBehavior]
public class OrderProcessorService : IOrderProcessor
{
static Random r = new Random(137);
public static string QueueName;
public static string PoisonQueueName;
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SubmitPurchaseOrder(PurchaseOrder po)
{
int randomNumber = r.Next(10);
if (randomNumber % 2 == 0)
{
Orders.Add(po);
Console.WriteLine("Processing {0} ", po);
}
else
{
Console.WriteLine("Aborting transaction, cannot process purchase order: " + po.PONumber);
Console.WriteLine();
throw new Exception("Cannot process purchase order: " + po.PONumber);
}
}
public static void StartThreadProc(object stateInfo)
{
StartService();
}
public static void StartService()
{
// Get MSMQ queue name from app settings in configuration
QueueName = ConfigurationManager.AppSettings["queueName"];
// Get MSMQ queue name for the final poison queue
PoisonQueueName = ConfigurationManager.AppSettings["poisonQueueName"];
// Create the transacted MSMQ queue if necessary.
if (!System.Messaging.MessageQueue.Exists(QueueName))
System.Messaging.MessageQueue.Create(QueueName, true);
// Create the transacted poison message MSMQ queue if necessary.
if (!System.Messaging.MessageQueue.Exists(PoisonQueueName))
System.Messaging.MessageQueue.Create(PoisonQueueName, true);
// Get the base address that is used to listen for WS-MetaDataExchange requests
// This is useful to generate a proxy for the client
string baseAddress = ConfigurationManager.AppSettings["baseAddress"];
// Create a ServiceHost for the OrderProcessorService type.
ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService), new Uri(baseAddress));
// Hook on to the service host faulted events
serviceHost.Faulted += new EventHandler(OnServiceFaulted);
// Open the ServiceHostBase to create listeners and start listening for messages.
serviceHost.Open();
}
public static void OnServiceFaulted(object sender, EventArgs e)
{
Console.WriteLine("Service Faulted");
}
// Host the service within this EXE console application.
public static void Main()
{
OrderProcessorService.StartService();
// The service can now be accessed.
Console.WriteLine("The service is ready.");
Console.WriteLine("Press <ENTER> to stop the service");
Console.ReadLine();
Console.WriteLine("Exiting service");
}
}
PoisonErrorBehaviorAttribute
は、PoisonErrorHandler
をインストールします。PoisonErrorHandler
は、IErrorHandler の実装です。システムで例外がスローされると必ず IErrorHandler が呼び出されます。この実装で、有害メッセージが見つかったことを示す MsmqPoisonMessageException を検索します。例外から MSMQ メッセージの検索 ID を取得します。System.Messaging API を使用してキューから有害メッセージを受け取り、後で処理するためにそれを特定の有害メッセージ キューに移動します。メッセージが引き続き ServiceModel トランザクションでロックされている場合があるため、待機して読み取りが成功したかどうかを確認し、再試行します。メッセージが別のキューに移動されると、キュー内の残りのメッセージを使用できます。新しい ServiceHost が作成されて、処理するために開かれます。次のコード例に、この動作を示します。
public class PoisonErrorHandler : IErrorHandler
{
static WaitCallback orderProcessingCallback = new WaitCallback(OrderProcessorService.StartThreadProc);
public void ProvideFault(Exception error, MessageVersion version, ref Message fault)
{
// no-op -we are not interested in this.
}
// Handle poison message exception by moving the offending message out of the way for regular processing to go on.
public bool HandleError(Exception error)
{
MsmqPoisonMessageException poisonException = error as MsmqPoisonMessageException;
if (null != poisonException)
{
long lookupId = poisonException.MessageLookupId;
Console.WriteLine(" Poisoned message -message look up id = {0}", lookupId);
// Get MSMQ queue name from app settings in configuration.
System.Messaging.MessageQueue orderQueue = new System.Messaging.MessageQueue(OrderProcessorService.QueueName);
System.Messaging.MessageQueue poisonMessageQueue = new System.Messaging.MessageQueue(OrderProcessorService.PoisonQueueName);
System.Messaging.Message message = null;
// Use a new transaction scope to remove the message from the main application queue and add it to the poison queue.
// The poison message service processes messages from the poison queue.
using(TransactionScope txScope= new TransactionScope(TransactionScopeOption.RequiresNew))
{
int retryCount = 0;
while(retryCount < 3)
{
retryCount++;
try
{
// Look up the poison message using the look up id.
message = orderQueue.ReceiveByLookupId(lookupId);
if(message != null)
{
// Send the message to the poison message queue.
poisonMessageQueue.Send(message, System.Messaging.MessageQueueTransactionType.Automatic);
// complete transaction scope
txScope.Complete();
Console.WriteLine("Moved poisoned message with look up id: {0} to poison queue: {1} ", lookupId, OrderProcessorService.PoisonQueueName);
break;
}
}
catch(InvalidOperationException )
{
//Code for the case when the message may still not be available in the queue because of a race condition in transaction or
//another node in the farm may actually have taken the message.
if (retryCount < 3)
{
Console.WriteLine("Trying to move poison message but message is not available, sleeping for 10 seconds before retrying");
Thread.Sleep(TimeSpan.FromSeconds(10));
}
else
{
Console.WriteLine("Giving up on trying to move the message");
}
}
}
}
// Restart the service host.
Console.WriteLine();
Console.WriteLine("Restarting the service to process rest of the messages in the queue");
Console.WriteLine("Press <ENTER> to stop the service");
ThreadPool.QueueUserWorkItem(orderProcessingCallback);
return true;
}
return false;
}
}
サービス構成には、receiveRetryCount と receiveErrorHandling という有害メッセージ プロパティが含まれています。次の構成ファイルを参照してください。Windows Server 2003 および Windows XP で実行されている場合、maxRetryCycles プロパティおよび retryCycleDelay プロパティは無視されます。
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<!-- Use appSetting to configure the MSMQ queue name. -->
<add key="queueName" value=".\private$\ServiceModelSamplesPoison" />
<add key="poisonQueueName" value=".\private$\OrderProcessorPoisonQueue" />
<add key="baseAddress" value="https://localhost:8000/orderProcessor/poisonSample"/>
</appSettings>
<system.serviceModel>
<services>
<service
name="Microsoft.ServiceModel.Samples.OrderProcessorService">
<!-- Define NetMsmqEndpoint -->
<endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison"
binding="netMsmqBinding"
bindingConfiguration="PoisonBinding"
contract="Microsoft.ServiceModel.Samples.IOrderProcessor" />
</service>
</services>
<bindings>
<netMsmqBinding>
<binding name="PoisonBinding"
receiveRetryCount="0"
maxRetryCycles="1"
retryCycleDelay="00:00:05"
receiveErrorHandling="Fault">
</binding>
</netMsmqBinding>
</bindings>
</system.serviceModel>
</configuration>
有害メッセージ キューのメッセージの処理
有害メッセージ サービスは、最終的な有害メッセージ キューからメッセージを読み取って、それを処理します。
有害メッセージ キューのメッセージは、メッセージを処理するサービスに対応付けられたメッセージで、有害メッセージ サービスのエンドポイントとは異なる場合があります。そのため、有害メッセージ サービスがキューからメッセージを読み取ると、WCF のチャネル レイヤはエンドポイントで不一致を検出し、メッセージをディスパッチしません。この場合、メッセージは注文処理サービス宛てになりますが、有害メッセージ サービスが受信します。別のエンドポイント宛のメッセージも含めてメッセージの受信を続けるには、ServiceBehavior
を追加して、メッセージの任意の宛先サービス エンドポイントを一致条件としてアドレスをフィルタする必要があります。これは、有害メッセージ キューから読み込んだメッセージを正常に処理するために必要です。
有害メッセージ サービス実装そのものは、サービス実装と非常によく似ています。コントラクトを実装し、注文を処理します。コード例を次に示します。
// Service class that implements the service contract.
// Added code to write output to the console window.
[ServiceBehavior(AddressFilterMode=AddressFilterMode.Any)]
public class OrderProcessorService : IOrderProcessor
{
[OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
public void SubmitPurchaseOrder(PurchaseOrder po)
{
Orders.Add(po);
Console.WriteLine("Processing {0} ", po);
}
public static void OnServiceFaulted(object sender, EventArgs e)
{
Console.WriteLine("Service Faulted...exiting app");
Environment.Exit(1);
}
// Host the service within this EXE console application.
public static void Main()
{
// Create a ServiceHost for the OrderProcessorService type.
ServiceHost serviceHost = new ServiceHost(typeof(OrderProcessorService));
// Hook on to the service host faulted events.
serviceHost.Faulted += new EventHandler(OnServiceFaulted);
serviceHost.Open();
// The service can now be accessed.
Console.WriteLine("The poison message service is ready.");
Console.WriteLine("Press <ENTER> to terminate service.");
Console.WriteLine();
Console.ReadLine();
// Close the ServiceHostBase to shutdown the service.
if(serviceHost.State != CommunicationState.Faulted)
{
serviceHost.Close();
}
}
注文処理サービスはメッセージを注文キューから読み込みますが、有害メッセージ サービスはメッセージを有害サブキューから読み込みます。有害キューはメイン キューのサブキューであり、名前は "poison" で、MSMQ により自動生成されます。アクセスするには、メイン キュー名の後ろに ";" を追加し、その後ろにサブキュー名 (この場合は "poison") を指定します。次のサンプル構成を参照してください。
メモ : |
---|
MSMQ v3.0 用のサンプルでは、有害キュー名はサブキューではなく、メッセージの移動先キューになります。 |
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<services>
<service name="Microsoft.ServiceModel.Samples.OrderProcessorService">
<!-- Define NetMsmqEndpoint -->
<endpoint address="net.msmq://localhost/private/ServiceModelSamplesPoison;poison"
binding="netMsmqBinding"
contract="Microsoft.ServiceModel.Samples.IOrderProcessor" >
</endpoint>
</service>
</services>
</system.serviceModel>
</configuration>
サンプルを実行すると、クライアント、サービス、および有害メッセージ サービスのアクティビティがコンソール ウィンドウに表示されます。サービスがクライアントから受信したメッセージを表示できます。いずれかのコンソール ウィンドウで Enter キーを押すと、サービスがシャットダウンされます。
サービスが実行を開始し、注文を処理し、処理の終了をランダムに開始します。メッセージに注文処理の完了が示されていた場合、クライアントをもう一度実行して、サービスが実際にメッセージを終了したことを確認するまで別のメッセージを送信できます。構成されている有害設定に基づき、メッセージは 1 度処理を試行されてから、順番として最後に存在する有害キューに移されます。
The service is ready.
Press <ENTER> to terminate service.
Processing Purchase Order: 0f063b71-93e0-42a1-aa3b-bca6c7a89546
Customer: somecustomer.com
OrderDetails
Order LineItem: 54 of Blue Widget @unit price: $29.99
Order LineItem: 890 of Red Widget @unit price: $45.89
Total cost of this order: $42461.56
Order status: Pending
Processing Purchase Order: 5ef9a4fa-5a30-4175-b455-2fb1396095fa
Customer: somecustomer.com
OrderDetails
Order LineItem: 54 of Blue Widget @unit price: $29.99
Order LineItem: 890 of Red Widget @unit price: $45.89
Total cost of this order: $42461.56
Order status: Pending
Aborting transaction, cannot process purchase order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89
有害メッセージ サービスを起動して、有害メッセージを有害キューから読み込みます。この例では、有害メッセージ サービスはメッセージを読み込んで処理します。終了され有害指定された発注書が有害メッセージ サービスによって読み込まれるのを確認できます。
The service is ready.
Press <ENTER> to terminate service.
Processing Purchase Order: 23e0b991-fbf9-4438-a0e2-20adf93a4f89
Customer: somecustomer.com
OrderDetails
Order LineItem: 54 of Blue Widget @unit price: $29.99
Order LineItem: 890 of Red Widget @unit price: $45.89
Total cost of this order: $42461.56
Order status: Pending
サンプルを設定、ビルド、および実行するには
「Windows Communication Foundation サンプルの 1 回限りのセットアップの手順」が実行済みであることを確認します。
ソリューションの C# 版または Visual Basic .NET 版をビルドするには、「Windows Communication Foundation サンプルのビルド」の手順に従います。
サンプルを単一コンピュータ構成または複数コンピュータ構成で実行するには、キュー名を localhost から実際のホスト名に変更し、「Windows Communication Foundation サンプルの実行」の手順に従います。
netMsmqBinding バインディング トランスポートを使用する場合の既定では、セキュリティが有効です。トランスポート セキュリティの種類は、MsmqAuthenticationMode と MsmqProtectionLevel の 2 つのプロパティで決まります。既定では、認証モードは Windows に、保護レベルは Sign に、それぞれ設定されます。MSMQ で認証および署名機能を実行するには、MSMQ がドメインに属している必要があります。このサンプルをドメインに属していないコンピュータで実行すると、「User's internal message queuing certificate does not exist」というエラーが表示されます。
ワークグループに参加しているコンピュータでこのサンプルを実行するには
ドメインに属していないコンピュータを使用する場合は、トランスポート セキュリティをオフにします。オフにするには、認証モードとセキュリティ レベルを
None
に設定します。サンプル構成を次に示します。<bindings> <netMsmqBinding> <binding name="TransactedBinding"> <security mode="None"/> </binding> </netMsmqBinding> </bindings>
エンドポイントの bindingConfiguration 属性が設定されて、エンドポイントとバインディングとが関連付けられていることを確認します。
サンプルを実行する前に、PoisonMessageServer、サーバー、およびクライアントの構成を変更してください。
メモ : security mode を None に設定することは、MsmqAuthenticationMode、MsmqProtectionLevel、および Message のセキュリティを None に設定することに相当します。
Copyright © 2007 by Microsoft Corporation.All rights reserved.