Поделиться через


Корреляция сообщений

В примере MessageCorrelation показано, как приложение очереди сообщений (MSMQ) может отправлять сообщение MSMQ в службу Windows Communication Foundation (WCF) и как сообщения можно сопоставить между приложениями отправителя и получателя в сценарии запроса или ответа. В этом примере используется привязка msmqIntegrationBinding. Служба в этом случае — это локальное консольное приложение, позволяющее наблюдать за службой, получающей сообщения в очереди. k

Служба обрабатывает сообщение, полученное от отправителя, и отправляет ответное сообщение обратно отправителю. Отправитель сопоставляет полученный ответ с запросом, который он отправил первоначально. MessageID и CorrelationID свойства сообщения используются для сопоставления сообщений запроса и ответа.

Контракт IOrderProcessor службы определяет одностороннюю операцию службы, которая подходит для использования с очередями. В сообщении MSMQ нет заголовка действия, поэтому невозможно автоматически сопоставить различные сообщения MSMQ с контрактами операций. Таким образом, в данном случае может быть только один операционный контракт. Если вы хотите определить больше контрактов операций в службе, приложение должно предоставить сведения о том, какой заголовок в сообщении MSMQ (например, метка или корреляционный идентификатор) можно использовать для выбора контракта операции для обработки.

Сообщение MSMQ также не содержит сведений о том, какие заголовки сопоставляются с разными параметрами контракта операции. Таким образом, в контракте операции может быть только один параметр. Параметр имеет тип MsmqMessage<T>, содержащий базовое сообщение MSMQ. Тип "T" в классе MsmqMessage<T> представляет данные, которые сериализуются в тело сообщения MSMQ. В этом примере тип PurchaseOrder сериализуется в тело сообщения MSMQ.

[ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
[ServiceKnownType(typeof(PurchaseOrder))]
public interface IOrderProcessor
{
    [OperationContract(IsOneWay = true, Action = "*")]
    void SubmitPurchaseOrder(MsmqMessage<PurchaseOrder> msg);
}

Операция службы обрабатывает заказ на покупку и отображает содержимое заказа на покупку и его состояние в окне консоли службы. Конфигурация OperationBehaviorAttribute настраивает операцию на участие в транзакции с очередью и пометку транзакции как завершенной после завершения выполнения операции. PurchaseOrder содержит детали заказа, которые должны быть обработаны службой.

// Service class that implements the service contract.
public class OrderProcessorService : IOrderProcessor
{
   [OperationBehavior(TransactionScopeRequired = true,
          TransactionAutoComplete = true)]
   public void SubmitPurchaseOrder(MsmqMessage<PurchaseOrder> ordermsg)
   {
       PurchaseOrder po = (PurchaseOrder)ordermsg.Body;
       Random statusIndexer = new Random();
       po.Status = PurchaseOrder.OrderStates[statusIndexer.Next(3)];
       Console.WriteLine("Processing {0} ", po);
       //Send a response to the client that the order has been received
       // and is pending fulfillment.
       SendResponse(ordermsg);
    }

    private void SendResponse(MsmqMessage<PurchaseOrder> ordermsg)
    {
        OrderResponseClient client = new OrderResponseClient("OrderResponseEndpoint");

        //Set the correlation ID such that the client can correlate the response to the order.
        MsmqMessage<PurchaseOrder> orderResponseMsg = new MsmqMessage<PurchaseOrder>(ordermsg.Body);
        orderResponseMsg.CorrelationId = ordermsg.Id;
        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
        {
            client.SendOrderResponse(orderResponseMsg);
            scope.Complete();
        }

        client.Close();
    }
}

Служба использует пользовательский клиент OrderResponseClient для отправки сообщения MSMQ в очередь. Так как приложение, которое получает и обрабатывает сообщение, является приложением MSMQ, а не приложением WCF, между двумя приложениями нет неявного контракта службы. Поэтому мы не можем создать прокси-сервер с помощью средства Svcutil.exe в этом сценарии.

Настраиваемый прокси-сервер по сути одинаков для всех приложений WCF, использующих привязку msmqIntegrationBinding для отправки сообщений. В отличие от других прокси-серверов, он не включает ряд операций обслуживания. Это только операция отправки сообщений.

[System.ServiceModel.ServiceContractAttribute(Namespace = "http://Microsoft.ServiceModel.Samples")]
public interface IOrderResponse
{

    [System.ServiceModel.OperationContractAttribute(IsOneWay = true, Action = "*")]
    void SendOrderResponse(MsmqMessage<PurchaseOrder> msg);
}

public partial class OrderResponseClient : System.ServiceModel.ClientBase<IOrderResponse>, IOrderResponse
{

    public OrderResponseClient()
    { }

    public OrderResponseClient(string configurationName)
        : base(configurationName)
    { }

    public OrderResponseClient(System.ServiceModel.Channels.Binding binding, System.ServiceModel.EndpointAddress address)
        : base(binding, address)
    { }

    public void SendOrderResponse(MsmqMessage<PurchaseOrder> msg)
    {
        base.Channel.SendOrderResponse(msg);
    }
}

Служба размещена на собственных серверах. При использовании транспорта интеграции MSMQ необходимо заранее создать очередь. Это можно сделать вручную или с помощью кода. В этом примере служба содержит System.Messaging код, чтобы проверить наличие очереди и создать ее при необходимости. Имя очереди считывается из файла конфигурации.

public static void Main()
{
       // Get the MSMQ queue name from application settings in configuration.
      string queueName =
                ConfigurationManager.AppSettings["orderQueueName"];
      // Create the transacted MSMQ queue if necessary.
      if (!MessageQueue.Exists(queueName))
                MessageQueue.Create(queueName, true);
     // Create a ServiceHost for the OrderProcessorService type.
     using (ServiceHost serviceHost = new
                   ServiceHost(typeof(OrderProcessorService)))
     {
            serviceHost.Open();
            // The service can now be accessed.
            Console.WriteLine("The service is ready.");
            Console.WriteLine("Press <ENTER> to terminate service.");
            Console.ReadLine();
            // Close the ServiceHost to shutdown the service.
            serviceHost.Close();
      }
}

Очередь MSMQ, в которую отправляются запросы заказа, указана в разделе appSettings файла конфигурации. Конечные точки клиента и службы определяются в разделе system.serviceModel файла конфигурации. Оба указывают привязку msmqIntegrationBinding .

<appSettings>
  <add key="orderQueueName" value=".\private$\Orders" />
</appSettings>

<system.serviceModel>
  <client>
    <endpoint    name="OrderResponseEndpoint"
              address="msmq.formatname:DIRECT=OS:.\private$\OrderResponse"
              binding="msmqIntegrationBinding"
              bindingConfiguration="OrderProcessorBinding"
              contract="Microsoft.ServiceModel.Samples.IOrderResponse">
    </endpoint>
  </client>

  <services>
    <service
      name="Microsoft.ServiceModel.Samples.OrderProcessorService">
      <endpoint address="msmq.formatname:DIRECT=OS:.\private$\Orders"
                            binding="msmqIntegrationBinding"
                bindingConfiguration="OrderProcessorBinding"
                contract="Microsoft.ServiceModel.Samples.IOrderProcessor">
      </endpoint>
    </service>
  </services>

  <bindings>
    <msmqIntegrationBinding>
      <binding name="OrderProcessorBinding" >
        <security mode="None" />
      </binding>
    </msmqIntegrationBinding>
  </bindings>

</system.serviceModel>

Клиентское приложение использует System.Messaging для отправки устойчивого и транзакционного сообщения в очередь. Текст сообщения содержит заказ на покупку.

static void PlaceOrder()
{
    //Connect to the queue
    MessageQueue orderQueue =
            new MessageQueue(
                    ConfigurationManager.AppSettings["orderQueueName"])
    // Create the purchase order.
    PurchaseOrder po = new PurchaseOrder();
    po.CustomerId = "somecustomer.com";
    po.PONumber = Guid.NewGuid().ToString();
    PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
    lineItem1.ProductId = "Blue Widget";
    lineItem1.Quantity = 54;
    lineItem1.UnitCost = 29.99F;

    PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
    lineItem2.ProductId = "Red Widget";
    lineItem2.Quantity = 890;
    lineItem2.UnitCost = 45.89F;

    po.orderLineItems = new PurchaseOrderLineItem[2];
    po.orderLineItems[0] = lineItem1;
    po.orderLineItems[1] = lineItem2;

    Message msg = new Message();
    msg.UseDeadLetterQueue = true;
    msg.Body = po;

    //Create a transaction scope.
    using (TransactionScope scope = new
     TransactionScope(TransactionScopeOption.Required))
    {
        // Submit the purchase order.
        orderQueue.Send(msg, MessageQueueTransactionType.Automatic);
        // Complete the transaction.
        scope.Complete();
    }
    //Save the messageID for order response correlation.
    orderMessageID = msg.Id;
    Console.WriteLine("Placed the order, waiting for response...");
}

Очередь MSMQ, из которой получены ответы заказа, указывается в разделе appSettings файла конфигурации, как показано в следующем примере конфигурации.

Замечание

Имя очереди использует точку (.) для локального компьютера и обратные косые черты как разделители в пути. Адрес конечной точки WCF указывает схему msmq.formatname и использует localhost для локального компьютера. Правильно сформированное имя формата после msmq.formatname в URI, согласно рекомендациям MSMQ.

<appSettings>
    <add key=" orderResponseQueueName" value=".\private$\Orders" />
</appSettings>

Клиентское приложение сохраняет messageID сообщение запроса заказа, которое отправляется в службу и ожидает ответа от службы. Когда ответ поступает в очередь, клиент сопоставляет его с сообщением заказа, которое он первоначально отправил в службу, используя свойство correlationID сообщения. Это свойство содержит messageID от первоначального сообщения заказа, отправленного клиентом в службу.

static void DisplayOrderStatus()
{
    MessageQueue orderResponseQueue = new
     MessageQueue(ConfigurationManager.AppSettings
                  ["orderResponseQueueName"]);
    //Create a transaction scope.
    bool responseReceived = false;
    orderResponseQueue.MessageReadPropertyFilter.CorrelationId = true;
    while (!responseReceived)
    {
       Message responseMsg;
       using (TransactionScope scope2 = new
         TransactionScope(TransactionScopeOption.Required))
       {
          //Receive the Order Response message.
          responseMsg =
              orderResponseQueue.Receive
                   (MessageQueueTransactionType.Automatic);
          scope2.Complete();
     }
     responseMsg.Formatter = new
     System.Messaging.XmlMessageFormatter(new Type[] {
         typeof(PurchaseOrder) });
     PurchaseOrder responsepo = (PurchaseOrder)responseMsg.Body;
    //Check if the response is for the order placed.
    if (orderMessageID == responseMsg.CorrelationId)
    {
       responseReceived = true;
       Console.WriteLine("Status of current Order: OrderID-{0},Order
            Status-{1}", responsepo.PONumber, responsepo.Status);
    }
    else
    {
       Console.WriteLine("Status of previous Order: OrderID-{0},Order
            Status-{1}", responsepo.PONumber, responsepo.Status);
    }
  }
}

При запуске примера действия клиента и службы отображаются как в окнах службы, так и в консоли клиента. Вы можете увидеть, как служба получает сообщения от клиента и отправляет ответ клиенту. Клиент отображает ответ, полученный от службы. Нажмите клавишу ВВОД в каждом окне консоли, чтобы завершить работу службы и клиента.

Замечание

Для этого примера требуется установка очереди сообщений (MSMQ). См. инструкции по установке MSMQ в разделе "См. также".

Настройка, сборка и запуск примера

  1. Убедитесь, что вы выполнили процедуру настройки One-Time для образцов Windows Communication Foundation.

  2. Если сначала запущена служба, она проверит наличие очереди. Если очередь отсутствует, служба создаст ее. Сначала можно запустить службу для создания очереди или создать ее с помощью диспетчера очередей MSMQ. Выполните следующие действия, чтобы создать очередь в Windows 2008.

    1. Откройте диспетчер серверов в Visual Studio 2012.

    2. Разверните вкладку "Функции" .

    3. Щелкните правой кнопкой мыши Очереди частных сообщенийи выберите Создать, Частную очередь.

    4. Установите флажок транзакционная.

    5. Введите ServiceModelSamplesTransacted в качестве имени новой очереди.

  3. Чтобы создать версию решения на C# или Visual Basic .NET, следуйте инструкциям по сборке примеров Windows Communication Foundation .

  4. Чтобы запустить пример в конфигурации с одним компьютером, следуйте инструкциям в разделе "Примеры Windows Communication Foundation".

Запустите пример на нескольких компьютерах

  1. Скопируйте файлы программы службы из папки \service\bin\ соответствующего языка на компьютер службы.

  2. Скопируйте файлы клиентской программы из папки \client\bin\ в папку, соответствующую конкретному языку, на клиентский компьютер.

  3. В файле Client.exe.config измените параметр orderQueueName, чтобы указать имя компьютера для сервиса вместо ".".

  4. В файле Service.exe.config измените адрес конечной точки клиента, чтобы указать имя клиентского компьютера вместо ".".

  5. На компьютере службы запустите Service.exe из командной строки.

  6. На клиентском компьютере запустите Client.exe из командной строки.

См. также