在会话中对排队消息进行分组

Windows Communication Foundation (WCF) 提供了一个会话,使你可以将一组相关消息组合在一起,以便由单个接收应用程序进行处理。 属于一个会话的消息必须属于同一事务。 由于所有消息都是同一事务的一部分,因此如果无法处理一条消息,则会回滚整个会话。 会话对于死信队列和病毒队列具有类似的行为。 在为会话配置的排队绑定上设置的生存时间 (TTL) 属性被应用于整个会话。 如果会话中只有一些消息在 TTL 过期之前发送,则整个会话将置于死信队列中。 同样,当会话中的消息无法从应用程序队列发送到应用程序时,整个会话将置于病毒队列中(如果可用)。

消息分组示例

在将订单处理应用程序实现为 WCF 服务时,分组消息的一个有用示例是。 例如,客户端将订单提交到包含多个项的此应用程序。 对于每个项,客户端都会调用服务,这会导致发送单独的消息。 服务 A 可以接收第一项,服务器 B 可以接收第二项。 每次添加一个项目时,处理该项的服务器必须找到正确的顺序,并将该项添加到其中,这非常低效。 由于只有一台服务器处理所有请求,仍然会遇到这种效率低下,因为服务器必须跟踪当前正在处理的所有订单并确定新项所属的订单。 将单个订单的所有请求分组大大简化了此类应用程序的实现。 客户端应用程序在会话中以单个顺序发送所有项,因此当服务处理订单时,它会同时处理整个会话。 \

过程

设置服务协定以使用会话

  1. 定义需要会话的服务协定。 请通过指定 ServiceContractAttribute 属性来执行操作:

    SessionMode=SessionMode.Required  
    
  2. 在合约中将操作标记为单向,因为这些方法没有返回值。 可通过指定 OperationContractAttribute 属性来完成:

    [OperationContract(IsOneWay = true)]  
    
  3. 实现该服务协定并指定一个类型为 InstanceContextModeInstanceContextMode.PerSession。 这样,对于每个会话,仅实例化服务一次。

    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]  
    
  4. 每个服务操作都需要一个事务。 使用 OperationBehaviorAttribute 特性指定此项。 完成事务的操作还应设置TransactionAutoCompletetrue

    [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
    
  5. 配置一个终结点,该终结点使用系统提供的 NetMsmqBinding 绑定。

  6. 使用 System.Messaging. 创建事务队列。 还可以使用消息队列(MSMQ)或 MMC 创建队列。 如果你这样做,请创建一个事务性队列。

  7. 使用ServiceHost为服务创建服务主机。

  8. 打开服务主机以使服务可用。

  9. 关闭服务主机。

设置客户端

  1. 创建一个要写入事务性队列的事务范围。

  2. 使用 ServiceModel 元数据实用工具工具(Svcutil.exe)工具 创建 WCF 客户端。

  3. 下订单。

  4. 关闭 WCF 客户端。

示例:

DESCRIPTION

以下示例提供IProcessOrder服务的代码以及使用该服务的客户端代码。 它显示了 WCF 如何使用排队会话来提供分组行为。

服务代码

// Service Code:

using System;

using System.ServiceModel.Channels;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;
using System.Text;
using System.Collections.Generic;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract.
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples", SessionMode=SessionMode.Required)]
    public interface IOrderTaker
    {
        [OperationContract(IsOneWay = true)]
        void OpenPurchaseOrder(string customerId);

        [OperationContract(IsOneWay = true)]
        void AddProductLineItem(string productId, int quantity);

        [OperationContract(IsOneWay = true)]
        void EndPurchaseOrder();
    }

    // Define the Purchase Order Line Item
    public class PurchaseOrderLineItem
    {
        static Random r = new Random(137);

        string ProductId;
        float UnitCost;
        int Quantity;

        public PurchaseOrderLineItem(string productId, int quantity)
        {
            this.ProductId = productId;
            this.Quantity = quantity;
            this.UnitCost = r.Next(10000);
        }

        public override string ToString()
        {
            String displayString = "Order LineItem: " + Quantity + " of " + ProductId + " @unit price: $" + UnitCost + "\n";
            return displayString;
        }

        public float TotalCost
        {
            get { return UnitCost * Quantity; }
        }
    }

    // Define Purchase Order
    public class PurchaseOrder
    {
        string PONumber;
        string CustomerId;
        LinkedList<PurchaseOrderLineItem> orderLineItems = new LinkedList<PurchaseOrderLineItem>();

        public PurchaseOrder(string customerId)
        {
            this.CustomerId = customerId;
            this.PONumber = Guid.NewGuid().ToString();
        }

        public void AddProductLineItem(string productId, int quantity)
        {
            orderLineItems.AddLast(new PurchaseOrderLineItem(productId, quantity));
        }

        public float TotalCost
        {
            get
            {
                float totalCost = 0;
                foreach (PurchaseOrderLineItem lineItem in orderLineItems)
                    totalCost += lineItem.TotalCost;
                return totalCost;
            }
        }

        public string Status
        {
            get
            {
                return "Pending";
            }
        }

        public override string ToString()
        {
            StringBuilder strbuf = new StringBuilder("Purchase Order: " + PONumber + "\n");
            strbuf.Append("\tCustomer: " + CustomerId + "\n");
            strbuf.Append("\tOrderDetails\n");

            foreach (PurchaseOrderLineItem lineItem in orderLineItems)
            {
                strbuf.Append("\t\t" + lineItem.ToString());
            }

            strbuf.Append("\tTotal cost of this order: $" + TotalCost + "\n");
            strbuf.Append("\tOrder status: " + Status + "\n");
            return strbuf.ToString();
        }
    }

    // Service class which implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class OrderTakerService : IOrderTaker
    {
        PurchaseOrder po;

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void OpenPurchaseOrder(string customerId)
        {
            Console.WriteLine("Creating purchase order");
            po = new PurchaseOrder(customerId);
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void AddProductLineItem(string productId, int quantity)
        {
            po.AddProductLineItem(productId, quantity);
            Console.WriteLine("Product " + productId + " quantity " + quantity + " added to purchase order");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void EndPurchaseOrder()
        {
            Console.WriteLine("Purchase Order Completed");
            Console.WriteLine();
            Console.WriteLine(po.ToString());
        }

        // Host the service within this EXE console application.
        public static void Main()
        {
            // Get MSMQ queue name from app settings in configuration
            string queueName = ConfigurationManager.AppSettings["queueName"];

            // Create the transacted MSMQ queue if necessary.
            if (!MessageQueue.Exists(queueName))
                MessageQueue.Create(queueName, true);

            // Get the base address that is used to listen for WS-MetaDataExchange requests
            string baseAddress = ConfigurationManager.AppSettings["baseAddress"];

            // Create a ServiceHost for the OrderTakerService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(OrderTakerService), new Uri(baseAddress)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close();
            }
        }
    }
}
' Service Code:

Imports System.ServiceModel.Channels
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions
Imports System.Text
Imports System.Collections.Generic

Namespace Microsoft.ServiceModel.Samples
    ' Define a service contract. 
    <ServiceContract(Namespace:="http://Microsoft.ServiceModel.Samples", SessionMode:=SessionMode.Required)> _
    Public Interface IOrderTaker
        <OperationContract(IsOneWay:=True)> _
        Sub OpenPurchaseOrder(ByVal customerId As String)

        <OperationContract(IsOneWay:=True)> _
        Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)

        <OperationContract(IsOneWay:=True)> _
        Sub EndPurchaseOrder()
    End Interface

    ' Define the Purchase Order Line Item
    Public Class PurchaseOrderLineItem
        Private Shared r As New Random(137)

        Private ProductId As String
        Private UnitCost As Single
        Private Quantity As Integer

        Public Sub New(ByVal productId As String, ByVal quantity As Integer)
            Me.ProductId = productId
            Me.Quantity = quantity
            Me.UnitCost = r.Next(10000)
        End Sub

        Public Overrides Function ToString() As String
            Dim displayString As String = "Order LineItem: " & Quantity & " of " & ProductId & " @unit price: $" & UnitCost + Constants.vbLf
            Return displayString
        End Function

        Public ReadOnly Property TotalCost() As Single
            Get
                Return UnitCost * Quantity
            End Get
        End Property
    End Class

    ' Define Purchase Order
    Public Class PurchaseOrder
        Private PONumber As String
        Private CustomerId As String
        Private orderLineItems As New LinkedList(Of PurchaseOrderLineItem)()

        Public Sub New(ByVal customerId As String)
            Me.CustomerId = customerId
            Me.PONumber = Guid.NewGuid().ToString()
        End Sub

        Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)
            orderLineItems.AddLast(New PurchaseOrderLineItem(productId, quantity))
        End Sub

        Public ReadOnly Property TotalCost() As Single
            Get
                Dim totalCost_Renamed As Single = 0
                For Each lineItem In orderLineItems
                    totalCost_Renamed += lineItem.TotalCost
                Next lineItem
                Return totalCost_Renamed
            End Get
        End Property

        Public ReadOnly Property Status() As String
            Get
                Return "Pending"
            End Get
        End Property

        Public Overrides Function ToString() As String
            Dim strbuf As New StringBuilder("Purchase Order: " & PONumber & Constants.vbLf)
            strbuf.Append(Constants.vbTab & "Customer: " & CustomerId & Constants.vbLf)
            strbuf.Append(Constants.vbTab & "OrderDetails" & Constants.vbLf)

            For Each lineItem In orderLineItems
                strbuf.Append(Constants.vbTab + Constants.vbTab + lineItem.ToString())
            Next lineItem

            strbuf.Append(Constants.vbTab & "Total cost of this order: $" & TotalCost + Constants.vbLf)
            strbuf.Append(Constants.vbTab & "Order status: " & Status + Constants.vbLf)
            Return strbuf.ToString()
        End Function
    End Class


    ' Service class which implements the service contract.
    ' Added code to write output to the console window
    <ServiceBehavior(InstanceContextMode:=InstanceContextMode.PerSession)> _
    Public Class OrderTakerService
        Implements IOrderTaker
        Private po As PurchaseOrder

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=False)> _
        Public Sub OpenPurchaseOrder(ByVal customerId As String) Implements IOrderTaker.OpenPurchaseOrder
            Console.WriteLine("Creating purchase order")
            po = New PurchaseOrder(customerId)
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=False)> _
        Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer) Implements IOrderTaker.AddProductLineItem
            po.AddProductLineItem(productId, quantity)
            Console.WriteLine("Product " & productId & " quantity " & quantity & " added to purchase order")
        End Sub

        <OperationBehavior(TransactionScopeRequired:=True, TransactionAutoComplete:=True)> _
        Public Sub EndPurchaseOrder() Implements IOrderTaker.EndPurchaseOrder
            Console.WriteLine("Purchase Order Completed")
            Console.WriteLine()
            Console.WriteLine(po.ToString())
        End Sub


        ' Host the service within this EXE console application.
        Public Shared Sub Main()
            ' Get MSMQ queue name from app settings in configuration
            Dim queueName As String = ConfigurationManager.AppSettings("queueName")

            ' Create the transacted MSMQ queue if necessary.
            If (Not MessageQueue.Exists(queueName)) Then
                MessageQueue.Create(queueName, True)
            End If


            ' Get the base address that is used to listen for WS-MetaDataExchange requests
            Dim baseAddress As String = ConfigurationManager.AppSettings("baseAddress")

            ' Create a ServiceHost for the OrderTakerService type.
            Using serviceHost As New ServiceHost(GetType(OrderTakerService), New Uri(baseAddress))
                ' Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open()

                ' The service can now be accessed.
                Console.WriteLine("The service is ready.")
                Console.WriteLine("Press <ENTER> to terminate service.")
                Console.WriteLine()
                Console.ReadLine()

                ' Close the ServiceHostBase to shutdown the service.
                serviceHost.Close()
            End Using
        End Sub
    End Class
End Namespace

客户端的代码

using System;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            //Create a transaction scope.
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            {
                // Create a proxy with given client endpoint configuration
                OrderTakerClient client = new OrderTakerClient("OrderTakerEndpoint");
                try
                {
                    // Open a purchase order
                    client.OpenPurchaseOrder("somecustomer.com");
                    Console.WriteLine("Purchase Order created");

                    // Add product line items
                    Console.WriteLine("Adding 10 quantities of blue widget");
                    client.AddProductLineItem("Blue Widget", 10);

                    Console.WriteLine("Adding 23 quantities of red widget");
                    client.AddProductLineItem("Red Widget", 23);

                    // Close the purchase order
                    Console.WriteLine("Closing the purchase order");
                    client.EndPurchaseOrder();
                    client.Close();
                }
                catch (CommunicationException ex)
                {
                    client.Abort();
                }
                // Complete the transaction.
                scope.Complete();
            }
            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}

Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples
    'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    'Client implementation code.
    Friend Class Client
        Shared Sub Main()
            'Create a transaction scope.
            Using scope As New TransactionScope(TransactionScopeOption.Required)
                ' Create a proxy with given client endpoint configuration
                Dim client As New OrderTakerClient("OrderTakerEndpoint")
                Try
                    ' Open a purchase order
                    client.OpenPurchaseOrder("somecustomer.com")
                    Console.WriteLine("Purchase Order created")

                    ' Add product line items
                    Console.WriteLine("Adding 10 quantities of blue widget")
                    client.AddProductLineItem("Blue Widget", 10)

                    Console.WriteLine("Adding 23 quantities of red widget")
                    client.AddProductLineItem("Red Widget", 23)

                    ' Close the purchase order
                    Console.WriteLine("Closing the purchase order")
                    client.EndPurchaseOrder()
                    client.Close()
                Catch ex As CommunicationException
                    client.Abort()
                End Try
                ' Complete the transaction.
                scope.Complete()
            End Using
            Console.WriteLine()
            Console.WriteLine("Press <ENTER> to terminate client.")
            Console.ReadLine()
        End Sub
    End Class
End Namespace

另请参阅