消息相关性

MessageCorrelation 示例演示消息队列 (MSMQ) 应用程序如何将 MSMQ 消息发送到 Windows Communication Foundation (WCF) 服务,以及如何在请求/响应方案中将消息与接收方应用程序关联。 此示例使用 msmqIntegrationBinding 绑定。 在本例中,该服务是一个自托管控制台应用程序,用于监控接收排队消息的服务。 k

服务处理从发送方接收的消息,并将响应消息发送回发件人。 发送方将收到的响应与最初发送的请求相关联。 消息的MessageIDCorrelationID属性用于关联请求和响应消息。

IOrderProcessor 服务协定定义了适合与队列一起使用的单向服务操作。 MSMQ 消息没有 Action 标头,因此无法自动将不同的 MSMQ 消息映射到作协定。 因此,在这种情况下,只能有一个操作合同。 如果您希望在服务中定义更多的操作协定,则应用程序必须提供相关信息,如 MSMQ 消息中的哪个标头(例如标签或 correlationID)可用于确定要调度的操作协定。

MSMQ 消息也不包含诸如哪些标头映射到操作协定的不同参数等信息。 因此,操作合同中只能有一个参数。 参数的类型为 MsmqMessage<T>,该类型包含基础 MSMQ 消息。 类中的 MsmqMessage<T> 类型“T”表示序列化为 MSMQ 消息正文的数据。 在此示例中,PurchaseOrder 类型已被序列化到 MSMQ 消息正文中。

[ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
[ServiceKnownType(typeof(PurchaseOrder))]
public interface IOrderProcessor
{
    [OperationContract(IsOneWay = true, Action = "*")]
    void SubmitPurchaseOrder(MsmqMessage<PurchaseOrder> msg);
}

服务作处理采购订单,并在服务控制台窗口中显示采购订单的内容及其状态。 OperationBehaviorAttribute 使用队列配置要在事务中登记的操作并在返回操作时标记事务已完成。 PurchaseOrder 包含必须由服务处理的订单详细信息。

// Service class that implements the service contract.
public class OrderProcessorService : IOrderProcessor
{
   [OperationBehavior(TransactionScopeRequired = true,
          TransactionAutoComplete = true)]
   public void SubmitPurchaseOrder(MsmqMessage<PurchaseOrder> ordermsg)
   {
       PurchaseOrder po = (PurchaseOrder)ordermsg.Body;
       Random statusIndexer = new Random();
       po.Status = PurchaseOrder.OrderStates[statusIndexer.Next(3)];
       Console.WriteLine("Processing {0} ", po);
       //Send a response to the client that the order has been received
       // and is pending fulfillment.
       SendResponse(ordermsg);
    }

    private void SendResponse(MsmqMessage<PurchaseOrder> ordermsg)
    {
        OrderResponseClient client = new OrderResponseClient("OrderResponseEndpoint");

        //Set the correlation ID such that the client can correlate the response to the order.
        MsmqMessage<PurchaseOrder> orderResponseMsg = new MsmqMessage<PurchaseOrder>(ordermsg.Body);
        orderResponseMsg.CorrelationId = ordermsg.Id;
        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
        {
            client.SendOrderResponse(orderResponseMsg);
            scope.Complete();
        }

        client.Close();
    }
}

该服务使用自定义客户端 OrderResponseClient 将 MSMQ 消息发送到队列。 由于接收和处理消息的应用程序是 MSMQ 应用程序,而不是 WCF 应用程序,因此这两个应用程序之间没有隐式服务协定。 因此,在此方案中无法使用 Svcutil.exe 工具创建代理。

对于使用 msmqIntegrationBinding 绑定发送消息的所有 WCF 应用程序,自定义代理实质上是相同的。 与其他代理不同,它不包括一系列服务操作。 这仅仅是一项提交消息的操作。

[System.ServiceModel.ServiceContractAttribute(Namespace = "http://Microsoft.ServiceModel.Samples")]
public interface IOrderResponse
{

    [System.ServiceModel.OperationContractAttribute(IsOneWay = true, Action = "*")]
    void SendOrderResponse(MsmqMessage<PurchaseOrder> msg);
}

public partial class OrderResponseClient : System.ServiceModel.ClientBase<IOrderResponse>, IOrderResponse
{

    public OrderResponseClient()
    { }

    public OrderResponseClient(string configurationName)
        : base(configurationName)
    { }

    public OrderResponseClient(System.ServiceModel.Channels.Binding binding, System.ServiceModel.EndpointAddress address)
        : base(binding, address)
    { }

    public void SendOrderResponse(MsmqMessage<PurchaseOrder> msg)
    {
        base.Channel.SendOrderResponse(msg);
    }
}

该服务是自托管的。 使用 MSMQ 集成传输时,必须提前创建使用的队列。 这可以手动或通过代码完成。 在此示例中,服务包含 System.Messaging 用于检查队列是否存在的代码,并在必要时创建它。 队列名称从配置文件中读取。

public static void Main()
{
       // Get the MSMQ queue name from application settings in configuration.
      string queueName =
                ConfigurationManager.AppSettings["orderQueueName"];
      // Create the transacted MSMQ queue if necessary.
      if (!MessageQueue.Exists(queueName))
                MessageQueue.Create(queueName, true);
     // Create a ServiceHost for the OrderProcessorService type.
     using (ServiceHost serviceHost = new
                   ServiceHost(typeof(OrderProcessorService)))
     {
            serviceHost.Open();
            // The service can now be accessed.
            Console.WriteLine("The service is ready.");
            Console.WriteLine("Press <ENTER> to terminate service.");
            Console.ReadLine();
            // Close the ServiceHost to shutdown the service.
            serviceHost.Close();
      }
}

订单请求发送到的 MSMQ 队列是在配置文件的 appSettings 部分指定的。 客户端和服务终结点在配置文件的 system.serviceModel 节中定义。 两者都指定 msmqIntegrationBinding 绑定。

<appSettings>
  <add key="orderQueueName" value=".\private$\Orders" />
</appSettings>

<system.serviceModel>
  <client>
    <endpoint    name="OrderResponseEndpoint"
              address="msmq.formatname:DIRECT=OS:.\private$\OrderResponse"
              binding="msmqIntegrationBinding"
              bindingConfiguration="OrderProcessorBinding"
              contract="Microsoft.ServiceModel.Samples.IOrderResponse">
    </endpoint>
  </client>

  <services>
    <service
      name="Microsoft.ServiceModel.Samples.OrderProcessorService">
      <endpoint address="msmq.formatname:DIRECT=OS:.\private$\Orders"
                            binding="msmqIntegrationBinding"
                bindingConfiguration="OrderProcessorBinding"
                contract="Microsoft.ServiceModel.Samples.IOrderProcessor">
      </endpoint>
    </service>
  </services>

  <bindings>
    <msmqIntegrationBinding>
      <binding name="OrderProcessorBinding" >
        <security mode="None" />
      </binding>
    </msmqIntegrationBinding>
  </bindings>

</system.serviceModel>

客户端应用程序使用 System.Messaging 将持久性和事务性消息发送到队列。 消息的正文包含采购订单。

static void PlaceOrder()
{
    //Connect to the queue
    MessageQueue orderQueue =
            new MessageQueue(
                    ConfigurationManager.AppSettings["orderQueueName"])
    // Create the purchase order.
    PurchaseOrder po = new PurchaseOrder();
    po.CustomerId = "somecustomer.com";
    po.PONumber = Guid.NewGuid().ToString();
    PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
    lineItem1.ProductId = "Blue Widget";
    lineItem1.Quantity = 54;
    lineItem1.UnitCost = 29.99F;

    PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
    lineItem2.ProductId = "Red Widget";
    lineItem2.Quantity = 890;
    lineItem2.UnitCost = 45.89F;

    po.orderLineItems = new PurchaseOrderLineItem[2];
    po.orderLineItems[0] = lineItem1;
    po.orderLineItems[1] = lineItem2;

    Message msg = new Message();
    msg.UseDeadLetterQueue = true;
    msg.Body = po;

    //Create a transaction scope.
    using (TransactionScope scope = new
     TransactionScope(TransactionScopeOption.Required))
    {
        // Submit the purchase order.
        orderQueue.Send(msg, MessageQueueTransactionType.Automatic);
        // Complete the transaction.
        scope.Complete();
    }
    //Save the messageID for order response correlation.
    orderMessageID = msg.Id;
    Console.WriteLine("Placed the order, waiting for response...");
}

从中接收订单响应的 MSMQ 队列在配置文件的 appSettings 节中指定,如以下示例配置所示。

注释

队列名称为本地计算机使用圆点 (.),并在其路径中使用反斜杠分隔符。 WCF 终结点地址指定 msmq.formatname 方案,并将“localhost”用于本地计算机。 根据 MSMQ 准则,格式正确的格式名称应遵循 URI 中的 msmq.formatname。

<appSettings>
    <add key=" orderResponseQueueName" value=".\private$\Orders" />
</appSettings>

客户端应用程序保存 messageID 它发送到服务的订单请求消息,并等待来自服务的响应。 响应到达队列后,客户端会根据消息的 correlationID 属性将其与顺序消息相关联,其中包含客户端最初发送到服务的顺序消息的 messageID

static void DisplayOrderStatus()
{
    MessageQueue orderResponseQueue = new
     MessageQueue(ConfigurationManager.AppSettings
                  ["orderResponseQueueName"]);
    //Create a transaction scope.
    bool responseReceived = false;
    orderResponseQueue.MessageReadPropertyFilter.CorrelationId = true;
    while (!responseReceived)
    {
       Message responseMsg;
       using (TransactionScope scope2 = new
         TransactionScope(TransactionScopeOption.Required))
       {
          //Receive the Order Response message.
          responseMsg =
              orderResponseQueue.Receive
                   (MessageQueueTransactionType.Automatic);
          scope2.Complete();
     }
     responseMsg.Formatter = new
     System.Messaging.XmlMessageFormatter(new Type[] {
         typeof(PurchaseOrder) });
     PurchaseOrder responsepo = (PurchaseOrder)responseMsg.Body;
    //Check if the response is for the order placed.
    if (orderMessageID == responseMsg.CorrelationId)
    {
       responseReceived = true;
       Console.WriteLine("Status of current Order: OrderID-{0},Order
            Status-{1}", responsepo.PONumber, responsepo.Status);
    }
    else
    {
       Console.WriteLine("Status of previous Order: OrderID-{0},Order
            Status-{1}", responsepo.PONumber, responsepo.Status);
    }
  }
}

运行示例时,客户端和服务活动会显示在服务和客户端控制台窗口中。 可以看到服务从客户端接收消息,并将响应发送回客户端。 客户端显示从服务收到的响应。 在每个控制台窗口中按 Enter 可以关闭服务和客户端。

注释

此示例需要安装消息队列(MSMQ)。 请参阅“另请参阅”部分中的 MSMQ 安装说明。

设置、生成和运行示例

  1. 确保已为 Windows Communication Foundation 示例 执行One-Time 安装过程。

  2. 如果先运行服务,则它将检查以确保队列存在。 如果队列不存在,服务将创建一个队列。 可以先运行服务来创建队列,也可以通过 MSMQ 队列管理器创建一个队列。 按照以下步骤在 Windows 2008 中创建队列。

    1. 在 Visual Studio 2012 中打开服务器管理器。

    2. 展开“功能”选项卡

    3. 右键单击 专用消息队列,然后选择 “新建”,专用队列

    4. 选中“事务性”框

    5. 输入 ServiceModelSamplesTransacted 作为新队列的名称。

  3. 若要生成解决方案的 C# 或 Visual Basic .NET 版本,请按照 生成 Windows Communication Foundation 示例中的说明进行操作。

  4. 若要在单台计算机配置中运行示例程序,请按照 运行 Windows 通信基础 (Windows Communication Foundation) 示例程序中的说明进行操作。

在多台计算机上运行示例

  1. 将 \service\bin\ 文件夹中的服务程序文件(在语言特定的文件夹下)复制到服务计算机。

  2. 将 \client\bin\ 文件夹中的客户端程序文件(在特定于语言的文件夹下)复制到客户端计算机。

  3. 在 Client.exe.config 文件中,更改 orderQueueName 以指定服务计算机名称,而不是使用“.”。

  4. 在 Service.exe.config 文件中,更改客户端终结点地址以指定客户端计算机名称而不是“.”。

  5. 在服务计算机上,从命令提示符启动 Service.exe。

  6. 在客户端计算机上,在命令提示符下启动 Client.exe。

另请参阅