配信不能キューを使用したメッセージ転送エラー処理

キューに置かれたメッセージは、配信に失敗する可能性があります。 配信に失敗したメッセージは、配信不能キューに記録されます。 配信の失敗は、ネットワーク エラー、キューが削除されている、キューがいっぱいになっている、認証エラー、配信が時間どおりに行われなかったなど、さまざまな理由で生じる可能性があります。

キューに置かれたメッセージは、受信側のアプリケーションでタイムリーに読み取られないと、長時間キューに残ることがあります。 時間依存のメッセージでは、このような動作が適切でない場合があります。 時間依存のメッセージでは、メッセージをキューに格納しておくことができる期間を示す TTL (Time to Live) プロパティが、キューに置かれたバインディングに設定されおり、この期間を超えると、メッセージの有効期限が切れます。 期限切れのメッセージは、配信不能キューと呼ばれる特別なキューに送信されます。 また、キューのクォータの超過、認証エラーなどの理由で、メッセージが配信不能キューに置かれる場合もあります。

一般に、アプリケーションには、配信不能キューのメッセージとエラーの理由を読み取るための補正ロジックが記述されています。 補正ロジックはエラーの原因に依存します。 たとえば、認証エラーの場合は、メッセージに添付された証明書を修正し、メッセージを再送信できます。 また、ターゲット キューのクォータに達したために配信が失敗した場合は、クォータの問題が解決されていることを期待して配信を再試行できます。

ほとんどのキュー システムには、そのシステムから配信できなかったすべてのメッセージを格納するための、システム全体の配信不能キューがあります。 メッセージ キュー (MSMQ) には、システム全体の配信不能キューが 2 種類用意されています。1 つはトランザクション キューへの配信に失敗したメッセージを格納するトランザクション システム全体の配信不能キューで、もう 1 つは非トランザクション キューへの配信に失敗したメッセージを格納する非トランザクション システム全体の配信不能キューです。 2 つのクライアントで 2 つの異なるサービスにメッセージを送信しているために、WCF の異なるキューで、送信に同じ MSMQ サービスを共有している場合は、システム配信不能キューでメッセージが混在する可能性があります。 これは、必ずしも最適であるとは言えません。 場合によっては (たとえば、セキュリティ上の理由で)、一方のクライアントが、もう一方のクライアントのメッセージを配信不能キューから読み取ることができないようにする必要があるからです。 また、共有された配信不能キューでは、クライアントがキューを参照して、送信したメッセージを検索する必要もありますが、配信不能キューに置かれているメッセージの数によっては、これは極めて大きな負荷になる可能性があります。 そのため、Windows Vista の WCFNetMsmqBindingMsmqIntegrationBinding,、MSMQ などには、カスタム配信不能キュー (アプリケーション固有の配信不能キューと呼ばれることもあります) が用意されています。

カスタム配信不能キューでは、同じ MSMQ サービスを共有してメッセージを送信するクライアントをそれぞれ分離できます。

Windows Communication Foundation (WCF) の場合、Windows Server 2003 と Windows XP では、キューに置かれたすべてのクライアント アプリケーションに対して、システム全体で 1 つの配信不能キューが提供されます。 一方、Windows Vista では、WCF により、キューに置かれたクライアント アプリケーションごとに配信不能キューが提供されます。

配信不能キューの使用の指定

配信不能キューは、送信元アプリケーションのキュー マネージャーに存在します。 ここには、有効期限が切れたメッセージと転送または配信に失敗したメッセージが格納されます。

バインディングには、次の配信不能キュー プロパティがあります。

配信不能キューからのメッセージの読み取り

配信不能キューからメッセージを読み取るアプリケーションは、アプリケーション キューから読み取る WCF サービスとよく似ていますが、次のようなわずかな違いがあります。

  • トランザクション システムの配信不能キューからメッセージを読み取るには、URI (Uniform Resource Identifier) を net.msmq://localhost/system$;DeadXact という形式にする必要があります。

  • 非トランザクション システムの配信不能キューからメッセージを読み取る場合、URI は、net.msmq://localhost/system$;DeadLetter という形式にする必要があります。

  • カスタム配信不能キューからメッセージを読み取るには、URI を net.msmq://localhost/private/<custom-dlq-name> という形式にする必要があります。ここで、custom-dlq-name は、カスタム配信不能キューの名前です。

キューのアドレス指定方法の詳細については、「サービス エンドポイントとキューのアドレス指定」を参照してください。

受信側の WCF スタックでは、サービスがリッスンしているアドレスとメッセージのアドレスが照合されます。 アドレスが一致する場合、メッセージはディスパッチされますが、一致しない場合はディスパッチされません。 これにより、配信不能キューから読み取るときに問題が生じる可能性があります。一般に、配信不能キュー内のメッセージは該当サービスにアドレス指定され、配信不能キュー サービスにアドレス指定されないからです。 そのため、配信不能キューから読み取るサービスは、ServiceBehavior アドレス フィルターをインストールし、受信者とは無関係にキュー内のすべてのメッセージを一致させるようスタックに指示する必要があります。 具体的には、ServiceBehavior パラメーターを持つ Any を、配信不能キューからメッセージを読み取るサービスに追加する必要があります。

配信不能キューの有害メッセージの処理

配信不能キューでは、条件付きで有害メッセージを処理できます。 システム キューからサブキューを作成できないため、システム配信不能キューから読み取るときは、ReceiveErrorHandlingMove に設定できません。 カスタム配信不能キューから読み取る場合は、サブキューを使用できるので、Move が有害メッセージに対する有効な処置であることに注意してください。

ReceiveErrorHandlingReject に設定しているときにカスタム配信不能キューから読み取った場合、有害メッセージはシステム配信不能キューに置かれます。 システム配信不能キューから読み取ると、有害メッセージは破棄 (削除) されます。 MSMQ のシステム配信不能キューから拒否すると、メッセージは破棄 (削除) されます。

次の例は、配信不能キューを作成する方法と、配信不能キューを使用して期限切れのメッセージを処理する方法を示しています。 この例は、「方法: WCF エンドポイントを使用してキューに置かれたメッセージを交換する」の例に基づいています。 この例では、アプリケーションごとの配信不能キューを使用する発注書処理サービスにクライアント コードを書き込む方法を示します。 また、配信不能キューのメッセージを処理する方法も示します。

以下は、アプリケーションごとの配信不能キューを指定するクライアントのコードです。

using System;
using System.ServiceModel.Channels;
using System.Configuration;
//using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            // Get MSMQ queue name from appsettings in configuration.
            string deadLetterQueueName = ConfigurationManager.AppSettings["deadLetterQueueName"];

            // Create the transacted MSMQ queue for storing dead message if necessary.
            if (!System.Messaging.MessageQueue.Exists(deadLetterQueueName))
                System.Messaging.MessageQueue.Create(deadLetterQueueName, true);

            OrderProcessorClient client = new OrderProcessorClient("OrderProcessorEndpoint");
        try
            {	

                // Create the purchase order.
                PurchaseOrder po = new PurchaseOrder();
                po.CustomerId = "somecustomer.com";
                po.PONumber = Guid.NewGuid().ToString();

                PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
                lineItem1.ProductId = "Blue Widget";
                lineItem1.Quantity = 54;
                lineItem1.UnitCost = 29.99F;

                PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
                lineItem2.ProductId = "Red Widget";
                lineItem2.Quantity = 890;
                lineItem2.UnitCost = 45.89F;

                po.orderLineItems = new PurchaseOrderLineItem[2];
                po.orderLineItems[0] = lineItem1;
                po.orderLineItems[1] = lineItem2;

                //Create a transaction scope.
                using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
                {
                    // Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po);
                    // Complete the transaction.
                    scope.Complete();
                }

                client.Close();
            }
            catch(TimeoutException timeout)
            {
        Console.WriteLine(timeout.Message);
                client.Abort();
        }
            catch(CommunicationException conexcp)
            {
        Console.WriteLine(conexcp.Message);
                client.Abort();
        }

            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}
Imports System.ServiceModel.Channels
Imports System.Configuration
'using System.Messaging;
Imports System.ServiceModel
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples

    'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    'Client implementation code.
    Friend Class Client
        Shared Sub Main()
            ' Get MSMQ queue name from appsettings in configuration.
            Dim deadLetterQueueName As String = ConfigurationManager.AppSettings("deadLetterQueueName")

            ' Create the transacted MSMQ queue for storing dead message if necessary.
            If (Not System.Messaging.MessageQueue.Exists(deadLetterQueueName)) Then
                System.Messaging.MessageQueue.Create(deadLetterQueueName, True)
            End If


            Dim client As New OrderProcessorClient("OrderProcessorEndpoint")
            Try


                ' Create the purchase order.
                Dim po As New PurchaseOrder()
                po.CustomerId = "somecustomer.com"
                po.PONumber = Guid.NewGuid().ToString()

                Dim lineItem1 As New PurchaseOrderLineItem()
                lineItem1.ProductId = "Blue Widget"
                lineItem1.Quantity = 54
                lineItem1.UnitCost = 29.99F

                Dim lineItem2 As New PurchaseOrderLineItem()
                lineItem2.ProductId = "Red Widget"
                lineItem2.Quantity = 890
                lineItem2.UnitCost = 45.89F

                po.orderLineItems = New PurchaseOrderLineItem(1) {}
                po.orderLineItems(0) = lineItem1
                po.orderLineItems(1) = lineItem2

                'Create a transaction scope.
                Using scope As New TransactionScope(TransactionScopeOption.Required)
                    ' Make a queued call to submit the purchase order.
                    client.SubmitPurchaseOrder(po)
                    ' Complete the transaction.
                    scope.Complete()
                End Using


                client.Close()
            Catch timeout As TimeoutException
                Console.WriteLine(timeout.Message)
                client.Abort()
            Catch conexcp As CommunicationException
                Console.WriteLine(conexcp.Message)
                client.Abort()
            End Try

            Console.WriteLine()
            Console.WriteLine("Press <ENTER> to terminate client.")
            Console.ReadLine()
        End Sub
    End Class
End Namespace

以下は、クライアント構成ファイルのコードです。

以下は、配信不能キューのメッセージを処理するサービスのコードです。

using System;
using System.ServiceModel.Description;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract.
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
    public interface IOrderProcessor
    {
        [OperationContract(IsOneWay = true)]
        void SubmitPurchaseOrder(PurchaseOrder po);
    }

    // Service class that implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single, AddressFilterMode = AddressFilterMode.Any)]
    public class PurchaseOrderDLQService : IOrderProcessor
    {
        OrderProcessorClient orderProcessorService;
        public PurchaseOrderDLQService()
        {
            orderProcessorService = new OrderProcessorClient("OrderProcessorEndpoint");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SimpleSubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine("Submitting purchase order did not succeed ", po);
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
            Console.WriteLine();
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void SubmitPurchaseOrder(PurchaseOrder po)
        {
            Console.WriteLine("Submitting purchase order did not succeed ", po);
            MsmqMessageProperty mqProp = OperationContext.Current.IncomingMessageProperties[MsmqMessageProperty.Name] as MsmqMessageProperty;

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus);
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure);
            Console.WriteLine();

            // Resend the message if timed out.
            if (mqProp.DeliveryFailure == DeliveryFailure.ReachQueueTimeout ||
                mqProp.DeliveryFailure == DeliveryFailure.ReceiveTimeout)
            {
                // Re-send.
                Console.WriteLine("Purchase order Time To Live expired");
                Console.WriteLine("Trying to resend the message");

                // Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po);
                Console.WriteLine("Purchase order resent");
            }
        }

        // Host the service within this EXE console application.
        public static void Main()
        {
            // Create a ServiceHost for the PurchaseOrderDLQService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(PurchaseOrderDLQService)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close();
            }
        }
    }
}

Imports System.ServiceModel.Description
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.ServiceModel.Channels
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples
    ' Define a service contract. 
    <ServiceContract(Namespace:="http://Microsoft.ServiceModel.Samples")> _
    Public Interface IOrderProcessor
        <OperationContract(IsOneWay:=True)> _
        Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder)
    End Interface

    ' Service class that implements the service contract.
    ' Added code to write output to the console window
    <ServiceBehavior(InstanceContextMode:=InstanceContextMode.Single, ConcurrencyMode:=ConcurrencyMode.Single, AddressFilterMode:=AddressFilterMode.Any)> _
    Public Class PurchaseOrderDLQService
        Implements IOrderProcessor
        Private orderProcessorService As OrderProcessorClient
        Public Sub New()
            orderProcessorService = New OrderProcessorClient("OrderProcessorEndpoint")
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SimpleSubmitPurchaseOrder(ByVal po As PurchaseOrder)
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub SubmitPurchaseOrder(ByVal po As PurchaseOrder) Implements IOrderProcessor.SubmitPurchaseOrder
            Console.WriteLine("Submitting purchase order did not succeed ", po)
            Dim mqProp As MsmqMessageProperty = TryCast(OperationContext.Current.IncomingMessageProperties(MsmqMessageProperty.Name), MsmqMessageProperty)

            Console.WriteLine("Message Delivery Status: {0} ", mqProp.DeliveryStatus)
            Console.WriteLine("Message Delivery Failure: {0}", mqProp.DeliveryFailure)
            Console.WriteLine()

            ' Resend the message if timed out.
            If mqProp.DeliveryFailure = DeliveryFailure.ReachQueueTimeout OrElse mqProp.DeliveryFailure = DeliveryFailure.ReceiveTimeout Then
                ' Re-send.
                Console.WriteLine("Purchase order Time To Live expired")
                Console.WriteLine("Trying to resend the message")

                ' Reuse the same transaction used to read the message from dlq to enqueue the message to the application queue.
                orderProcessorService.SubmitPurchaseOrder(po)
                Console.WriteLine("Purchase order resent")
            End If
        End Sub

        ' Host the service within this EXE console application.
        Public Shared Sub Main()
            ' Create a ServiceHost for the PurchaseOrderDLQService type.
            Using serviceHost As New ServiceHost(GetType(PurchaseOrderDLQService))
                ' Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open()

                ' The service can now be accessed.
                Console.WriteLine("The dead letter service is ready.")
                Console.WriteLine("Press <ENTER> to terminate service.")
                Console.WriteLine()
                Console.ReadLine()

                ' Close the ServiceHostBase to shutdown the service.
                serviceHost.Close()
            End Using
        End Sub
    End Class
End Namespace

以下は、配信不能キュー サービス構成ファイルのコードです。

関連項目