Udostępnij przez


Korelacja komunikatów

Przykład MessageCorrelation pokazuje, jak aplikacja kolejkowania komunikatów (MSMQ) może wysyłać komunikat MSMQ do usługi Windows Communication Foundation (WCF) i jak komunikaty mogą być skorelowane między aplikacjami nadawcy i odbiorcy w scenariuszu żądania/odpowiedzi. W tym przykładzie użyto powiązania msmqIntegrationBinding. Usługa w tym przypadku jest aplikacją konsolową hostowaną samodzielnie, aby umożliwić obserwowanie usługi, która odbiera komunikaty w kolejce. k

Usługa przetwarza komunikat otrzymany od nadawcy i wysyła komunikat odpowiedzi z powrotem do nadawcy. Nadawca koreluje odpowiedź odebraną do wysłanego pierwotnie żądania. Właściwości MessageID i CorrelationID komunikatu służą do korelowania komunikatów żądania i odpowiedzi.

Kontrakt usługi IOrderProcessor określa jednokierunkową operację usługi, która jest odpowiednia do użycia z systemem kolejkowym. Komunikat MSMQ nie posiada nagłówka Akcji, więc nie można automatycznie przypisywać różnych wiadomości MSMQ do kontraktów operacji. W związku z tym w tym przypadku może istnieć tylko jeden kontrakt operacji. Jeśli chcesz zdefiniować więcej kontraktów operacyjnych w usłudze, aplikacja musi podać informacje, który nagłówek w komunikacie MSMQ (na przykład etykieta lub identyfikator korelacji) może być użyty do decyzji, który kontrakt operacyjny ma zostać wywołany.

Komunikat MSMQ nie zawiera również informacji o tym, które nagłówki są mapowane na różne parametry kontraktu operacji. W związku z tym w kontrakcie operacji może istnieć tylko jeden parametr. Parametr jest typu MsmqMessage<T>, który zawiera podstawowy komunikat MSMQ. Typ "T" w MsmqMessage<T> klasie reprezentuje dane serializowane w treści komunikatu MSMQ. W tym przykładzie PurchaseOrder typ jest serializowany do treści komunikatu MSMQ.

[ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
[ServiceKnownType(typeof(PurchaseOrder))]
public interface IOrderProcessor
{
    [OperationContract(IsOneWay = true, Action = "*")]
    void SubmitPurchaseOrder(MsmqMessage<PurchaseOrder> msg);
}

Operacja usługi przetwarza zamówienie zakupu i wyświetla zawartość zamówienia zakupu oraz jej stan w oknie konsoli usługi. Polecenie OperationBehaviorAttribute konfiguruje operację w celu przyłączenia się do transakcji z kolejką i oznaczenia transakcji jako zakończonej po zakończeniu operacji. PurchaseOrder zawiera szczegóły zamówienia, które muszą zostać przetworzone przez usługę.

// Service class that implements the service contract.
public class OrderProcessorService : IOrderProcessor
{
   [OperationBehavior(TransactionScopeRequired = true,
          TransactionAutoComplete = true)]
   public void SubmitPurchaseOrder(MsmqMessage<PurchaseOrder> ordermsg)
   {
       PurchaseOrder po = (PurchaseOrder)ordermsg.Body;
       Random statusIndexer = new Random();
       po.Status = PurchaseOrder.OrderStates[statusIndexer.Next(3)];
       Console.WriteLine("Processing {0} ", po);
       //Send a response to the client that the order has been received
       // and is pending fulfillment.
       SendResponse(ordermsg);
    }

    private void SendResponse(MsmqMessage<PurchaseOrder> ordermsg)
    {
        OrderResponseClient client = new OrderResponseClient("OrderResponseEndpoint");

        //Set the correlation ID such that the client can correlate the response to the order.
        MsmqMessage<PurchaseOrder> orderResponseMsg = new MsmqMessage<PurchaseOrder>(ordermsg.Body);
        orderResponseMsg.CorrelationId = ordermsg.Id;
        using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
        {
            client.SendOrderResponse(orderResponseMsg);
            scope.Complete();
        }

        client.Close();
    }
}

Usługa używa niestandardowego klienta OrderResponseClient do wysyłania komunikatu MSMQ do kolejki. Ponieważ aplikacja, która odbiera i przetwarza komunikat, jest aplikacją MSMQ, a nie aplikacją WCF, nie ma niejawnego kontraktu usługi między dwiema aplikacjami. Dlatego nie możemy utworzyć serwera proxy przy użyciu narzędzia Svcutil.exe w tym scenariuszu.

Niestandardowy serwer proxy jest zasadniczo taki sam dla wszystkich aplikacji WCF, które używają powiązania msmqIntegrationBinding do wysyłania wiadomości. W przeciwieństwie do innych serwerów proxy, nie obejmuje zakresu operacji serwisowych. Jest to tylko operacja przesyłania komunikatów.

[System.ServiceModel.ServiceContractAttribute(Namespace = "http://Microsoft.ServiceModel.Samples")]
public interface IOrderResponse
{

    [System.ServiceModel.OperationContractAttribute(IsOneWay = true, Action = "*")]
    void SendOrderResponse(MsmqMessage<PurchaseOrder> msg);
}

public partial class OrderResponseClient : System.ServiceModel.ClientBase<IOrderResponse>, IOrderResponse
{

    public OrderResponseClient()
    { }

    public OrderResponseClient(string configurationName)
        : base(configurationName)
    { }

    public OrderResponseClient(System.ServiceModel.Channels.Binding binding, System.ServiceModel.EndpointAddress address)
        : base(binding, address)
    { }

    public void SendOrderResponse(MsmqMessage<PurchaseOrder> msg)
    {
        base.Channel.SendOrderResponse(msg);
    }
}

Usługa jest hostowana samodzielnie. W przypadku korzystania z transportu integracji MSMQ kolejka musi zostać utworzona z wyprzedzeniem. Można to zrobić ręcznie lub za pomocą kodu. W tym przykładzie usługa zawiera System.Messaging kod umożliwiający sprawdzenie istnienia kolejki i utworzenie jej w razie potrzeby. Nazwa kolejki jest odczytywana z pliku konfiguracji.

public static void Main()
{
       // Get the MSMQ queue name from application settings in configuration.
      string queueName =
                ConfigurationManager.AppSettings["orderQueueName"];
      // Create the transacted MSMQ queue if necessary.
      if (!MessageQueue.Exists(queueName))
                MessageQueue.Create(queueName, true);
     // Create a ServiceHost for the OrderProcessorService type.
     using (ServiceHost serviceHost = new
                   ServiceHost(typeof(OrderProcessorService)))
     {
            serviceHost.Open();
            // The service can now be accessed.
            Console.WriteLine("The service is ready.");
            Console.WriteLine("Press <ENTER> to terminate service.");
            Console.ReadLine();
            // Close the ServiceHost to shutdown the service.
            serviceHost.Close();
      }
}

Kolejka MSMQ, do której są wysyłane żądania zamówienia, jest określona w sekcji appSettings pliku konfiguracji. Punkty końcowe klienta i usługi są zdefiniowane w sekcji system.serviceModel pliku konfiguracji. Oba określają msmqIntegrationBinding powiązanie.

<appSettings>
  <add key="orderQueueName" value=".\private$\Orders" />
</appSettings>

<system.serviceModel>
  <client>
    <endpoint    name="OrderResponseEndpoint"
              address="msmq.formatname:DIRECT=OS:.\private$\OrderResponse"
              binding="msmqIntegrationBinding"
              bindingConfiguration="OrderProcessorBinding"
              contract="Microsoft.ServiceModel.Samples.IOrderResponse">
    </endpoint>
  </client>

  <services>
    <service
      name="Microsoft.ServiceModel.Samples.OrderProcessorService">
      <endpoint address="msmq.formatname:DIRECT=OS:.\private$\Orders"
                            binding="msmqIntegrationBinding"
                bindingConfiguration="OrderProcessorBinding"
                contract="Microsoft.ServiceModel.Samples.IOrderProcessor">
      </endpoint>
    </service>
  </services>

  <bindings>
    <msmqIntegrationBinding>
      <binding name="OrderProcessorBinding" >
        <security mode="None" />
      </binding>
    </msmqIntegrationBinding>
  </bindings>

</system.serviceModel>

Aplikacja kliencka używa System.Messaging do wysyłania trwałego i transakcyjnego komunikatu do kolejki. Treść wiadomości zawiera zamówienie zakupu.

static void PlaceOrder()
{
    //Connect to the queue
    MessageQueue orderQueue =
            new MessageQueue(
                    ConfigurationManager.AppSettings["orderQueueName"])
    // Create the purchase order.
    PurchaseOrder po = new PurchaseOrder();
    po.CustomerId = "somecustomer.com";
    po.PONumber = Guid.NewGuid().ToString();
    PurchaseOrderLineItem lineItem1 = new PurchaseOrderLineItem();
    lineItem1.ProductId = "Blue Widget";
    lineItem1.Quantity = 54;
    lineItem1.UnitCost = 29.99F;

    PurchaseOrderLineItem lineItem2 = new PurchaseOrderLineItem();
    lineItem2.ProductId = "Red Widget";
    lineItem2.Quantity = 890;
    lineItem2.UnitCost = 45.89F;

    po.orderLineItems = new PurchaseOrderLineItem[2];
    po.orderLineItems[0] = lineItem1;
    po.orderLineItems[1] = lineItem2;

    Message msg = new Message();
    msg.UseDeadLetterQueue = true;
    msg.Body = po;

    //Create a transaction scope.
    using (TransactionScope scope = new
     TransactionScope(TransactionScopeOption.Required))
    {
        // Submit the purchase order.
        orderQueue.Send(msg, MessageQueueTransactionType.Automatic);
        // Complete the transaction.
        scope.Complete();
    }
    //Save the messageID for order response correlation.
    orderMessageID = msg.Id;
    Console.WriteLine("Placed the order, waiting for response...");
}

Kolejka MSMQ, z której odbierane są odpowiedzi zamówień, jest określona w sekcji appSettings pliku konfiguracji, jak pokazano w poniższej przykładowej konfiguracji.

Uwaga / Notatka

Nazwa kolejki używa kropki (.) dla komputera lokalnego i ukośników odwrotnych w swojej ścieżce. Adres punktu końcowego programu WCF określa schemat msmq.formatname i używa wartości "localhost" dla komputera lokalnego. Poprawnie sformułowana nazwa formatu musi występować po msmq.formatname w identyfikatorze URI, zgodnie z wytycznymi MSMQ.

<appSettings>
    <add key=" orderResponseQueueName" value=".\private$\Orders" />
</appSettings>

Aplikacja kliencka zapisuje messageID komunikat żądania zamówienia, który wysyła do usługi i czeka na odpowiedź z usługi. Gdy odpowiedź pojawi się w kolejce, klient skoreluje ją z komunikatem o zamówieniu wysłanym przy użyciu właściwości komunikatu correlationID, która zawiera messageID komunikatu zamówienia, który pierwotnie wysłał do usługi.

static void DisplayOrderStatus()
{
    MessageQueue orderResponseQueue = new
     MessageQueue(ConfigurationManager.AppSettings
                  ["orderResponseQueueName"]);
    //Create a transaction scope.
    bool responseReceived = false;
    orderResponseQueue.MessageReadPropertyFilter.CorrelationId = true;
    while (!responseReceived)
    {
       Message responseMsg;
       using (TransactionScope scope2 = new
         TransactionScope(TransactionScopeOption.Required))
       {
          //Receive the Order Response message.
          responseMsg =
              orderResponseQueue.Receive
                   (MessageQueueTransactionType.Automatic);
          scope2.Complete();
     }
     responseMsg.Formatter = new
     System.Messaging.XmlMessageFormatter(new Type[] {
         typeof(PurchaseOrder) });
     PurchaseOrder responsepo = (PurchaseOrder)responseMsg.Body;
    //Check if the response is for the order placed.
    if (orderMessageID == responseMsg.CorrelationId)
    {
       responseReceived = true;
       Console.WriteLine("Status of current Order: OrderID-{0},Order
            Status-{1}", responsepo.PONumber, responsepo.Status);
    }
    else
    {
       Console.WriteLine("Status of previous Order: OrderID-{0},Order
            Status-{1}", responsepo.PONumber, responsepo.Status);
    }
  }
}

Po uruchomieniu przykładu, działania klienta i usługi są zaprezentowane zarówno w oknach konsoli usługi, jak i klienta. Usługa odbiera komunikaty od klienta i wysyła odpowiedź z powrotem do klienta. Klient wyświetla odpowiedź odebraną z usługi. Naciśnij ENTER w każdym oknie konsoli, aby zamknąć usługę i klienta.

Uwaga / Notatka

Ten przykład wymaga instalacji kolejkowania komunikatów (MSMQ). Zapoznaj się z instrukcjami dotyczącymi instalacji programu MSMQ w sekcji Zobacz również.

Konfigurowanie, kompilowanie i uruchamianie przykładu

  1. Upewnij się, że wykonano procedurę instalacji One-Time dla przykładów programu Windows Communication Foundation.

  2. Jeśli usługa jest uruchomiona jako pierwsza, sprawdzi, czy kolejka jest obecna. Jeśli kolejka nie jest obecna, usługa utworzy je. Możesz najpierw uruchomić usługę, aby utworzyć kolejkę, lub utworzyć jedną za pośrednictwem menedżera kolejki MSMQ. Wykonaj następujące kroki, aby utworzyć kolejkę w systemie Windows 2008.

    1. Otwórz Menedżera serwera w programie Visual Studio 2012.

    2. Rozwiń kartę Funkcje .

    3. Kliknij prawym przyciskiem myszy Prywatne Kolejki Wiadomościi wybierz Nowa, Prywatna Kolejka.

    4. Zaznacz pole Transakcyjne.

    5. Wprowadź ServiceModelSamplesTransacted jako nazwę nowej kolejki.

  3. Aby skompilować wersję rozwiązania w języku C# lub Visual Basic .NET, postępuj zgodnie z instrukcjami w Kompilowanie przykładów Windows Communication Foundation.

  4. Aby uruchomić przykład w konfiguracji pojedynczego komputera, postępuj zgodnie z instrukcjami w temacie Uruchamianie przykładów programu Windows Communication Foundation.

Uruchom przykład na komputerach

  1. Skopiuj pliki programu usługi z folderu \service\bin\ w folderze specyficznym dla języka na komputer usługi.

  2. Skopiuj pliki programu klienckiego z folderu \client\bin\ w folderze specyficznym dla języka na komputer kliencki.

  3. W pliku Client.exe.config zmień nazwę orderQueueName, aby określić nazwę komputera usługi zamiast ".".

  4. W pliku Service.exe.config zmień adres punktu końcowego klienta, aby określić nazwę komputera klienckiego zamiast ".".

  5. Na komputerze serwisowym uruchom Service.exe z poziomu wiersza polecenia.

  6. Na komputerze klienckim uruchom Client.exe z poziomu wiersza polecenia.

Zobacz także