在会话中对排队消息进行分组

使用 Windows Communication Foundation (WCF) 提供的会话可以将一组相关消息分为一组,以便由单个接收应用程序进行处理。属于一个会话的消息必须属于同一事务。因为所有消息都属于同一事务,所以如果有一个消息未能得到处理,整个会话都将回滚。会话对于死信队列和病毒队列具有类似的行为。在为会话配置的排队绑定上设置的生存时间 (TTL) 属性被应用于整个会话。如果在 TTL 过期前仅发送了会话中的一部分消息,则会将整个会话都放到死信队列中。与此类似,如果会话中有消息未能发送到应用程序队列中的应用程序,则会将整个会话都放到病毒队列(如果可用)中。

消息分组示例

如果将订单处理应用程序作为 WCF 服务来实现,将消息分组会十分有用。例如,某个客户端向此应用程序提交一个包含许多项的订单。对于每个项,该客户端都要调用一次服务,每次调用都会产生一个要发送的消息。有可能发生服务器 A 接收第一个项,而服务器 B 接收第二个项的情况。每次添加一个项时,处理该项的服务器都必须找到相应的订单并将该项添加到订单中 — 这样做效率非常低。如果只使用单个服务器来处理所有请求,仍然会非常低效,因为该服务器必须跟踪当前正在处理的所有订单并确定新项属于哪个订单。将单个订单的所有请求分为一组可大大简化这样一个应用程序的实现。客户端应用程序在一个会话中发送单个订单的所有项,因此当服务处理该订单时,它可以一次性处理整个会话。\

过程

设置服务协定以使用会话

  1. 定义一个需要会话的服务协定。为此,可使用 OperationContractAttribute 属性并指定以下内容:

    SessionMode=SessionMode.Required
    
  2. 将该协定中的操作标记为单向操作,因为这些方法不返回任何结果。为此,可使用 OperationContractAttribute 属性并指定以下内容:

    [OperationContract(IsOneWay = true)]
    
  3. 实现该服务协定并指定一个类型为 PerSessionInstanceContextMode。这样,对于每个会话,仅实例化服务一次。

    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    
  4. 每个服务操作都需要一个事务。可使用 OperationBehaviorAttribute 属性予以指定。负责完成该事务的操作还应将 TransactionAutoComplete 设置为 true

    [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] 
    
  5. 配置一个终结点,该终结点使用系统提供的 NetProfileMsmqBinding 绑定。

  6. 使用 System.Messaging 创建一个事务性队列。还可以使用消息队列 (MSMQ) 或 MMC 创建该队列。如果您这样做,请创建一个事务性队列。

  7. 使用 ServiceHost 为该服务创建一个服务主机。

  8. 打开服务主机使服务处于可用状态。

  9. 关闭服务主机。

设置客户端

  1. 创建一个要写入事务性队列的事务范围。

  2. 使用 ServiceModel 元数据实用工具 (Svcutil.exe) 工具创建 WCF 客户端。

  3. 下订单。

  4. 关闭 WCF 客户端。

示例

说明

下面的示例提供 IProcessOrder 服务的代码和一个使用此服务的客户端的代码。该示例演示 WCF 如何使用排队会话来提供分组行为。

服务代码

' Service Code:

Imports System
Imports System.ServiceModel.Channels
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions
Imports System.Text
Imports System.Collections.Generic

Namespace Microsoft.ServiceModel.Samples
    ' Define a service contract. 
    <ServiceContract(Namespace := "http://Microsoft.ServiceModel.Samples", SessionMode:=SessionMode.Required)> _
    Public Interface IOrderTaker
        <OperationContract(IsOneWay := True)> _
        Sub OpenPurchaseOrder(ByVal customerId As String)

        <OperationContract(IsOneWay := True)> _
        Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)

        <OperationContract(IsOneWay := True)> _
        Sub EndPurchaseOrder()
    End Interface

    ' Define the Purchase Order Line Item
    Public Class PurchaseOrderLineItem
        Private Shared r As New Random(137)

        Private ProductId As String
        Private UnitCost As Single
        Private Quantity As Integer

        Public Sub New(ByVal productId As String, ByVal quantity As Integer)
            Me.ProductId = productId
            Me.Quantity = quantity
            Me.UnitCost = r.Next(10000)
        End Sub

        Public Overrides Function ToString() As String
            Dim displayString As String = "Order LineItem: " & Quantity & " of " & ProductId & " @unit price: $" & UnitCost + Constants.vbLf
            Return displayString
        End Function

        Public ReadOnly Property TotalCost() As Single
            Get
                Return UnitCost * Quantity
            End Get
        End Property
    End Class

    ' Define Purchase Order
    Public Class PurchaseOrder
        Private PONumber As String
        Private CustomerId As String
        Private orderLineItems As New LinkedList(Of PurchaseOrderLineItem)()

        Public Sub New(ByVal customerId As String)
            Me.CustomerId = customerId
            Me.PONumber = Guid.NewGuid().ToString()
        End Sub

        Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer)
            orderLineItems.AddLast(New PurchaseOrderLineItem(productId, quantity))
        End Sub

        Public ReadOnly Property TotalCost() As Single
            Get
                Dim totalCost_Renamed As Single = 0
                For Each lineItem In orderLineItems
                    totalCost_Renamed += lineItem.TotalCost
                Next lineItem
                Return totalCost_Renamed
            End Get
        End Property

        Public ReadOnly Property Status() As String
            Get
                Return "Pending"
            End Get
        End Property

        Public Overrides Function ToString() As String
            Dim strbuf As New StringBuilder("Purchase Order: " & PONumber & Constants.vbLf)
            strbuf.Append(Constants.vbTab & "Customer: " & CustomerId & Constants.vbLf)
            strbuf.Append(Constants.vbTab & "OrderDetails" & Constants.vbLf)

            For Each lineItem In orderLineItems
                strbuf.Append(Constants.vbTab + Constants.vbTab + lineItem.ToString())
            Next lineItem

            strbuf.Append(Constants.vbTab & "Total cost of this order: $" & TotalCost + Constants.vbLf)
            strbuf.Append(Constants.vbTab & "Order status: " & Status + Constants.vbLf)
            Return strbuf.ToString()
        End Function
    End Class


    ' Service class which implements the service contract.
    ' Added code to write output to the console window
    <ServiceBehavior(InstanceContextMode:=InstanceContextMode.PerSession)> _
    Public Class OrderTakerService
        Implements IOrderTaker
        Private po As PurchaseOrder

        <OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := False)> _
        Public Sub OpenPurchaseOrder(ByVal customerId As String) Implements IOrderTaker.OpenPurchaseOrder
            Console.WriteLine("Creating purchase order")
            po = New PurchaseOrder(customerId)
        End Sub

        <OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := False)> _
        Public Sub AddProductLineItem(ByVal productId As String, ByVal quantity As Integer) Implements IOrderTaker.AddProductLineItem
            po.AddProductLineItem(productId, quantity)
            Console.WriteLine("Product " & productId & " quantity " & quantity & " added to purchase order")
        End Sub

        <OperationBehavior(TransactionScopeRequired := True, TransactionAutoComplete := True)> _
        Public Sub EndPurchaseOrder() Implements IOrderTaker.EndPurchaseOrder
            Console.WriteLine("Purchase Order Completed")
            Console.WriteLine()
            Console.WriteLine(po.ToString())
        End Sub


        ' Host the service within this EXE console application.
        Public Shared Sub Main()
            ' Get MSMQ queue name from app settings in configuration
            Dim queueName As String = ConfigurationManager.AppSettings("queueName")

            ' Create the transacted MSMQ queue if necessary.
            If (Not MessageQueue.Exists(queueName)) Then
                MessageQueue.Create(queueName, True)
            End If


            ' Get the base address that is used to listen for WS-MetaDataExchange requests
            Dim baseAddress As String = ConfigurationManager.AppSettings("baseAddress")

            ' Create a ServiceHost for the OrderTakerService type.
            Using serviceHost As New ServiceHost(GetType(OrderTakerService), New Uri(baseAddress))
                ' Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open()

                ' The service can now be accessed.
                Console.WriteLine("The service is ready.")
                Console.WriteLine("Press <ENTER> to terminate service.")
                Console.WriteLine()
                Console.ReadLine()

                ' Close the ServiceHostBase to shutdown the service.
                serviceHost.Close()
            End Using
        End Sub
    End Class
End Namespace
// Service Code:

using System;

using System.ServiceModel.Channels;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;
using System.Text;
using System.Collections.Generic;

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract. 
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples", SessionMode=SessionMode.Required)]
    public interface IOrderTaker
    {
        [OperationContract(IsOneWay = true)]
        void OpenPurchaseOrder(string customerId);

        [OperationContract(IsOneWay = true)]
        void AddProductLineItem(string productId, int quantity);

        [OperationContract(IsOneWay = true)]
        void EndPurchaseOrder();
    }

    // Define the Purchase Order Line Item
    public class PurchaseOrderLineItem
    {
        static Random r = new Random(137);

        string ProductId;
        float UnitCost;
        int Quantity;

        public PurchaseOrderLineItem(string productId, int quantity)
        {
            this.ProductId = productId;
            this.Quantity = quantity;
            this.UnitCost = r.Next(10000);
        }

        public override string ToString()
        {
            String displayString = "Order LineItem: " + Quantity + " of " + ProductId + " @unit price: $" + UnitCost + "\n";
            return displayString;
        }

        public float TotalCost
        {
            get { return UnitCost * Quantity; }
        }
    }

    // Define Purchase Order
    public class PurchaseOrder
    {
        string PONumber;
        string CustomerId;
        LinkedList<PurchaseOrderLineItem> orderLineItems = new LinkedList<PurchaseOrderLineItem>();

        public PurchaseOrder(string customerId)
        {
            this.CustomerId = customerId;
            this.PONumber = Guid.NewGuid().ToString();
        }

        public void AddProductLineItem(string productId, int quantity)
        {
            orderLineItems.AddLast(new PurchaseOrderLineItem(productId, quantity));
        }

        public float TotalCost
        {
            get
            {
                float totalCost = 0;
                foreach (PurchaseOrderLineItem lineItem in orderLineItems)
                    totalCost += lineItem.TotalCost;
                return totalCost;
            }
        }

        public string Status
        {
            get
            {
                return "Pending";
            }
        }

        public override string ToString()
        {
            StringBuilder strbuf = new StringBuilder("Purchase Order: " + PONumber + "\n");
            strbuf.Append("\tCustomer: " + CustomerId + "\n");
            strbuf.Append("\tOrderDetails\n");

            foreach (PurchaseOrderLineItem lineItem in orderLineItems)
            {
                strbuf.Append("\t\t" + lineItem.ToString());
            }

            strbuf.Append("\tTotal cost of this order: $" + TotalCost + "\n");
            strbuf.Append("\tOrder status: " + Status + "\n");
            return strbuf.ToString();
        }
    }


    // Service class which implements the service contract.
    // Added code to write output to the console window
    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class OrderTakerService : IOrderTaker
    {
        PurchaseOrder po;

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void OpenPurchaseOrder(string customerId)
        {
            Console.WriteLine("Creating purchase order");
            po = new PurchaseOrder(customerId);
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = false)]
        public void AddProductLineItem(string productId, int quantity)
        {
            po.AddProductLineItem(productId, quantity);
            Console.WriteLine("Product " + productId + " quantity " + quantity + " added to purchase order");
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public void EndPurchaseOrder()
        {
            Console.WriteLine("Purchase Order Completed");
            Console.WriteLine();
            Console.WriteLine(po.ToString());
        }


        // Host the service within this EXE console application.
        public static void Main()
        {
            // Get MSMQ queue name from app settings in configuration
            string queueName = ConfigurationManager.AppSettings["queueName"];

            // Create the transacted MSMQ queue if necessary.
            if (!MessageQueue.Exists(queueName))
                MessageQueue.Create(queueName, true);


            // Get the base address that is used to listen for WS-MetaDataExchange requests
            string baseAddress = ConfigurationManager.AppSettings["baseAddress"];

            // Create a ServiceHost for the OrderTakerService type.
            using (ServiceHost serviceHost = new ServiceHost(typeof(OrderTakerService), new Uri(baseAddress)))
            {
                // Open the ServiceHostBase to create listeners and start listening for messages.
                serviceHost.Open();

                // The service can now be accessed.
                Console.WriteLine("The service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                serviceHost.Close(); 
            }
        }
    }
}
<!-- Service Config File: -->
<appSettings>
  <!-- use appSetting to configure MSMQ queue name -->
  <add key="queueName" value=".\private$\ServiceModelSamplesSession" />
  <add key="baseAddress" value="https://localhost:8000/orderTaker/sessionSample"/>
</appSettings>

<system.serviceModel>
  <services>
    <service name="Microsoft.ServiceModel.Samples.OrderTakerService"
             behaviorConfiguration="MyServiceTypeBehaviors" >

      <!-- Define NetMsmqEndpoint -->
      <endpoint address="net.msmq://localhost/private/ServiceModelSamplesSession"
                binding="netMsmqBinding"
                contract="Microsoft.ServiceModel.Samples.IOrderTaker" />
      <endpoint contract="IMetadataExchange" binding="mexHttpBinding" address="mex" />

    </service>
  </services>

  <behaviors>
    <serviceBehaviors>
      <behavior name="MyServiceTypeBehaviors" >
        <serviceMetadata httpGetEnabled="true" />
      </behavior>
    </serviceBehaviors>
  </behaviors>

</system.serviceModel>

客户端代码

Imports System
Imports System.Configuration
Imports System.Messaging
Imports System.ServiceModel
Imports System.Transactions

Namespace Microsoft.ServiceModel.Samples
    'The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    'Client implementation code.
    Friend Class Client
        Shared Sub Main()
            'Create a transaction scope.
            Using scope As New TransactionScope(TransactionScopeOption.Required)
                ' Create a proxy with given client endpoint configuration
                Dim client As New OrderTakerClient("OrderTakerEndpoint")
                Try
                    ' Open a purchase order
                    client.OpenPurchaseOrder("somecustomer.com")
                    Console.WriteLine("Purchase Order created")

                    ' Add product line items
                    Console.WriteLine("Adding 10 quantities of blue widget")
                    client.AddProductLineItem("Blue Widget", 10)

                    Console.WriteLine("Adding 23 quantities of red widget")
                    client.AddProductLineItem("Red Widget", 23)

                    ' Close the purchase order
                    Console.WriteLine("Closing the purchase order")
                    client.EndPurchaseOrder()
                    client.Close()
                Catch ex As CommunicationException
                    client.Abort()
                End Try
                ' Complete the transaction.
                scope.Complete()
            End Using
            Console.WriteLine()
            Console.WriteLine("Press <ENTER> to terminate client.")
            Console.ReadLine()
        End Sub
    End Class
End Namespace
using System;
using System.Configuration;
using System.Messaging;
using System.ServiceModel;
using System.Transactions;

namespace Microsoft.ServiceModel.Samples
{
    //The service contract is defined in generatedProxy.cs, generated from the service by the svcutil tool.

    //Client implementation code.
    class Client
    {
        static void Main()
        {
            //Create a transaction scope.
            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
            {
                // Create a proxy with given client endpoint configuration
                OrderTakerClient client = new OrderTakerClient("OrderTakerEndpoint");
                try
                {
                    // Open a purchase order
                    client.OpenPurchaseOrder("somecustomer.com");
                    Console.WriteLine("Purchase Order created");

                    // Add product line items
                    Console.WriteLine("Adding 10 quantities of blue widget");
                    client.AddProductLineItem("Blue Widget", 10);

                    Console.WriteLine("Adding 23 quantities of red widget");
                    client.AddProductLineItem("Red Widget", 23);

                    // Close the purchase order
                    Console.WriteLine("Closing the purchase order");
                    client.EndPurchaseOrder();
                    client.Close();
                }
                catch (CommunicationException ex)
                {
                    client.Abort();
                }
                // Complete the transaction.
                scope.Complete();
            }
            Console.WriteLine();
            Console.WriteLine("Press <ENTER> to terminate client.");
            Console.ReadLine();
        }
    }
}
<system.serviceModel>

  <client>
    <!-- Define NetMsmqEndpoint -->
    <endpoint name="OrderTakerEndpoint"
              address="net.msmq://localhost/private/ServiceModelSamplesSession" 
              binding="netMsmqBinding" 
              contract="IOrderTaker" />
  </client>

</system.serviceModel>

另请参见

任务

会话和队列

概念

队列概述