Integration Patterns utilizing the Windows Azure Service Bus–Part II

In the previous post I covered the Messaging Channel group of patterns. In this post, I will cover the Message Routing Group patterns.  This group contains:

Content Based Router, Message Filter, Recipient List, Splitter, Aggregator, Resequencer, Composed Message Processor, Scatter-Gather, Process Manager and Message Broker.

So, lets dive right in to each of these patterns and see how these can be implemented.

 

ContentRouter Content-Based Router

This pattern requires the the router system inspect each message and direct that message to different channels based on data elements contained in the message.  The routing should be configurable to route based on different fields, the existence or non-existence of a specific value and the routing rules should be easily maintained as there are typically frequent changes

This pattern can be implement in Azure using a Service Bus Topic.  There are two parts that need to be implemented to put this pattern into use.  The first is that subscriptions need to be setup and the second is that a message needs to be created with the specific properties populated to match the subscription which follows the diagram below:

ContentBasedRouter

 

The following code shows to create the subscriptions as well as the submitted message.  Let’s first start by creating the subscriptions

 public static void CBRPatternExampleWithTopic(String topicName)
 {
 String serviceNamespace = ....;
 String issuer = ....;
 String key = ....;
 
 SharedSecretCredential credential =.... 
  ....
  ServiceBusNamespaceClient namespaceClient = new ServiceBusNamespaceClient(serviceBusUri, credential);
 Topic topic = namespaceClient.GetTopic(topicName);
  //using the AddSubscription(String, FilterExpression) method overload 
  topic.AddSubscription(“PremiumMemberType”,new SqlFilter("MemberType = Premium"));
  topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType <> Premium"));
 }

    

Then we will create the message with the property populated that the subscriptions will key off of.

 public static void SubmitMessageToTopic()
 {
 String issuer = ....;
 String key = ....;
  TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuer, key); 
 Uri uri = ServiceBusEnvironment.CreateServiceUri(.., .., ..);

 MessagingFactory factory = MessagingFactory.Create(uri, tp);
 MessageSender myTopic = factory.CreateMessageSender("<TopicName>");
  BrokeredMessage message = new BrokeredMessage(); 
  //This will create a message that will be routed to the first subscription above 
  message.Properties.Add("MemberType", Premium); 
  //OR add the following property instead to have it routed to the other subscription above
 message.Properties.Add("MemberType", Gold);
 myTopic.Send(message);

 

Filter Message Filter

This pattern requires that any message that doesn’t match the criteria of a subscription will be discarded.  This pattern is used to eliminate any messages that aren’t routed to a subscriber such as those setup using the Content Based Router pattern.  If the message doesn’t fall into a filter used by the subscriber then the message should be routed to a filter channel. 

When using an Azure Service Bus Topic you can utilize the RuleDescription object along with the FilterAction and FilterExpression objects to setup the subscriber filters.  You can explicitly utilize the MatchNoneFilterExpression object to subscribe to messages that haven’t been routed to any other subscriber.  Once routed to this subscriber you can either log that the message wasn’t picked up and you can either process it or discard the message

The following code shows how to add a subscriber to the Topic for a MatchNoneFilterExpression

 public static void AddFilterPatternSubscriptionToTopic(String topicName)
 {
 String serviceNamespace = ....;
 String issuer = ....;
 String key = ....;
 
 SharedSecretCredential credential =.... 
  ....
  ServiceBusNamespaceClient namespaceClient = new ServiceBusNamespaceClient(serviceBusUri, credential);
 Topic topic = namespaceClient.GetTopic(topicName);
  <place other subscriber code here>
  RuleDescription matchNoneRule = new RuleDescription()
 {
     FilterAction = new SqlFilterAction(“set defer = ‘yes’;”),
     FilterExpression = new MatchNoneFilterExpression()
 };
 
  //using the AddSubscription(String, RuleDescription) method overload
  Subscription matchNoneFilterSubscription =
 topic.AddSubscription(“matchNoneFilterSubscription”, matchNoneRule);  
  }
    

DynamicRouter Dynamic Router

This pattern provides for the scenario in that each recipient would register and list the conditions or rules for which messages or types of messages it can handle.  The conditions or rules are stored so that when a message arrives, the rules run and the message parameters are evaluated against the rules and then the message is routed accordingly to the correct receiver.  The benefit of this is that you can achieve predictive routing, with eased maintenance and less dependency on each receiver.

This pattern is implemented in Azure by default in both the Topic and the Queues.  When you create a subscription and a receiver for a topic you are registering with the service and as part of that registration process you also provide rules or conditions for the types of messages or the value of properties that you want to filter for. 

But what if we want to take this a step further and provide even more dynamic functionality.  What if we could implement the functionality that is available in BizTalk called Promoted Properties but do it completely within the functionality of Azure.  Promoted Properties is a way of taking data contained in the body of an object or message and put that in a properties collection that can then be used to route messages.  When working with Azure, the BrokeredMessage object contains a properties collection that can be used to route however, you manually have to set these properties at design time.  Our goal is to take the values of properties in the object body itself and promote those to the BrokeredMessage properties collection so that they can be assigned and routed at run time.

The subscribers on the Topic still need to be created and registered but this functionality allows the data in the message to direct the flow of the message and provides further dynamic routing scenarios. 

I have been working with Michael Muta (MCS Consultant) and he came up with a great implementation of this pattern.  Lets look at how this can be accomplished. 

First, we would create a new class that will be serialized as the message body of the BrokeredMessage.  In this case we will create a class for membership.

     [Serializable]
    public class Member
    {
        public Member()
        {
            
        }

        public bool NewMember { get; set; }
        public string Name { get; set; }
        public string Address { get; set; }

        [PromotedProperty("ReceiveEMail", PropertyDirection.ReadAndWrite)]   
        public string ReceiveEmailAds { get; set; }

        [PromotedProperty("Licensed", PropertyDirection.ReadOnly)]
        public bool Licensed
        {
            get { return (....); }
        }
    }

As you can see we have decorated a number of the properties with custom attributes.  We decorate every property that we will want to add to the BrokeredMessage properties collection.  In addition, we can set the PropertyDirection to either ReadOnly or ReadAndWrite.  ReadOnly certainly makes sense – we just need to take a value of the property and promote that into the collection.  However, we can take this beyond just static promotion.  With the ReadAndWrite we can provide the functionality to update the property as the message is received from the Topic.  With the Subscriptions on our Topics we have the ability to perform an action in the rules.  In that action we can set the value of a property, and in this case, we can push that modification all that way back to the internal property on the object.  We can then use that for other downstream actions in our processing of the message.  This give us a lot more flexibility and functionality in our solution. 

After we have our object decorated we can put together the code to send the message.  We can take the same method that we used earlier to submit a message but we will do a couple of different things. 

As you will notice, the code to submit the message is pretty much the same – we create the factory, create the message sender, create an instance of our member object and create a BrokeredMessage.  In order to get our promoted properties code to work as an extension to the BrokereMessage we needed to utilize the Extension Methods functionality in C#.  We needed to do this because the the BrokeredMessage class is sealed.  One of the drawbacks of the Extension Methods functionality is that it doesn’t work on the objects constructor.  Therefore, we need to call the PromoteProperties method that is placed on the BrokeredMessage class.  By doing this, this method will interrogate all of decorated properties on your object and promote them. 

 public static void SubmitMessageToTopic()
 {
 String issuer = ....;
 String key = ....;
  TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuer, key); 
 Uri uri = ServiceBusEnvironment.CreateServiceUri(.., .., ..);

 MessagingFactory factory = MessagingFactory.Create(uri, tp);
 MessageSender myTopic = factory.CreateMessageSender("<TopicName>");
  
  //Create our message body object 
  Member memberItem = new Member();
  BrokeredMessage message = new BrokeredMessage(memberItem); 
  //Now that we have created the message and passed in our object we need to do one more thing 
  message.PromoteProperties(memberItem); 
  //even thought we are promoting properties, we can still add properties at design time as usual as shown below
  //message.Properties.Add("MemberType", Gold);
  myTopic.Send(message);
 }

One other item I want to mention in the code above is that even though you promote properties decorated in your object you can still add properties to the BrokeredMessage properties collection at design time as usual.

 

So, we now have our object decorated and we have the code to submit the message to the Topic.  Lets look at the extension method that promotes the properties.

 public static void PromoteProperties(this BrokeredMessage msg, object serializableObject)
        {
            PropertyInfo[] properties = serializableObject.GetType().GetProperties();

            foreach (PropertyInfo prop in properties)
            {
                foreach (PromotedProperty promotedProp in prop.GetCustomAttributes(typeof(PromotedProperty), true))
                {
                    object value = prop.GetValue(serializableObject, null);
                    msg.Properties.Add(promotedProp.Name, value);
                }
            }
        }

This method loops through the properties in the object, find the promoted properties and adds the value of the property with the name provided in the attribute to the BrokeredMessage’s property collection.  There are other other methods in the extension class for things such as setting the properties value for our ReadAndWrite property direction but this gives an overview on how we can add this functionality and provide not only a great implementation of this pattern but also extend the functionality of the Azure Topics and routing functionality.

 

RecipientList Recipient List

This pattern is related to the Content Based Router pattern in that the CBR pattern routes messages based on the message content.  The Recipient List pattern provides a means for the message to contain a list of one or more recipients to receive the message.  This differs from CBR in that we may want to submit a message to multiple order fulfillment suppliers in order to receive back prices thus allowing for the comparison of responses and the selection of the cheapest supplier as shown in the diagram below:

RecipientList

This pattern can be implemented in the Azure Service Bus Topic very similarly to that of the Content Based Router example. The difference will be that we will create the subscriptions using the SQLFilter but will utilize the LIKE keyword. This way the sender can populate the property with a comma separated list and the subscriptions will pick it up and route to N number of receivers.

So, if we created three subscriptions

 topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType LIKE ‘%Premium%’"));
 topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType LIKE ‘%Gold%’"));
topic.AddSubscription("OtherMemberTypes", new SqlFilter("MemberType LIKE ‘%Silver%’"));

and we wanted to create a message that would be routed to Premium and Gold we would create a BrokeredMessage and set the following property:

 message.Properties.Add("MemberType", “Premium,Gold”);
 This would allow us to set, at the client,  the recipients that we wanted to send the message to.
  

Splitter Splitter

This pattern provides the ability to break apart a composite message into its separate message parts and process the related message parts separately.  This pattern is useful in scenarios where you might have an order that has many order items and each of the order items need to be sent to different receivers or sent to different inventory systems.  

This pattern can be implemented in Azure using the Service Bus Topic or Queue by utilizing the sessions feature.  To setup a queue or topic for sessions we need to set the RequiresSession property in the QueueDescription or SubscriptionDescription.  Then each message needs a SessionId in order to relate the individual messages.  The receiver will read a message and accept the session.  Once the session is accepted, the session is locked and other receivers won’t be able accept it.  This ensures that there are no competing receivers.  One thing to note about sessions and Azure is that session ordering is guaranteed even if the receiver crashes.  Another thing to think about is this pattern comes in handy when you need to provide support for sending messages larger than the allowed message size.  To send larger message, you can chunk the message into smaller sizes and use the session id in order to receive all the messages in the same session and then reassemble them.

There is a great example of this that Alan Smith has put together so I won’t duplicate the effort.  Take a look at his blog for an example of implementing this pattern.

 

Aggregator Aggregator

This pattern is related to the splitter pattern.  Whereas the splitter processes separate messages, the aggregator is used to bring all the separate messages back together.

This pattern can be implemented in Azure using the Service Bus Topic and correlation.  When setting up the subscription you can use the CorrelationFilterExpression just as we did with the FilterExpressions in the code samples shown above in previous patterns.  So the submitter would set the CorrelationId property of the BrokeredMessage object to an identifier.  Then this same identifier would be used in the CorrelationFilter object that was configured when creating the subscription.  Also, you will want to know when the processing is finished for this correlation set.  You could easily create a custom property on the BrokeredMessage object (as we did previously with the MemberType property) that lists the total number of messages that are in the set or you may just decide to create a flag that states that this is the last message in the set.  Once the total count has been received or the message containing the flag is received then you can perform whatever task is required to aggregate all the messages together.

You might ask, why use Correlation over Session.  The CorrelationId property provides an exact match, the CorrelationId is saved in a hash table and the CorrelationId processing is better optimized for matching performance than the SQL expression filters.

There is a great example of this that Alan Smith has put together that is in the same article as his Splitter implementation so I won’t duplicate the effort. Take a look at his blog for an example of implementing this pattern.

 

 resequencer Resequencer

The Resequencer pattern provides guidance around putting messages back in order.  Individual messages may be processed by a downstream system and returned faster than other downstream systems can process their messages and thus leading to messages that are out of order.  This pattern uses a buffer to store messages so that they can be resequenced.  Typically the Resequencer does not modify the message in any way.  Once in the required order, the messages can be delivered as shown in the following diagram. 

Resequencer

 

This pattern can be implemented in Azure through a Service Bus Queue.  The queue becomes the buffer to not only store the messages that are submitted but also to store the messages that are out of order.  There are a number of steps that will be required to put this pattern in practice.  First, there is no change to the code that is used to submit the messages to the queue however you will need to populate the MessageId property of the BrokeredMessage object.  While there is nothing out of the ordinary to submit, there is a bit of work to be done by the receiver.  Next, we are going to take advantage of the Defer() method which will allow us to put a message back on to the queue that we detect is in the wrong sequence.  Then we can dequeue it later when we need that message by calling the receive message and passing in the message sequence.  This means that we need to keep a list of message sequence numbers in our code.  One thing to note is that once we defer a message and put it back on the queue the only way that we can retrieve it again is to call the receive and pass in the sequence number.  Another thing to note is that the Service Bus Sequencing may be different than the applications sequence numbering.  Therefore you may need to store both in either session state or an internal list.

Lets look at the code to put this pattern into place. 

First, let’s look at what is needed to send the message to the queue.

  string issuer = "....";
 string key = "....";

 //create the token provider and URI that are needed to send to the MessagingFactory
 TokenProvider tp = TokenProvider.CreateSharedSecretTokenProvider(issuer, key); 
 Uri uri = ServiceBusEnvironment.CreateServiceUri("sb", "<ServiceNamespace>", string.Empty);

 //retrieve the MessageSender for the SeqQueue within the namespace
 MessagingFactory factory = MessagingFactory.Create(uri, tp);
 MessageSender SeqQueue = factory.CreateMessageSender("SeqQueue");
  //create a message and use a string for the body.  We will put the sequence here as well as in a property.
 BrokeredMessage message1 = new BrokeredMessage("MessageSequence 1”);

 //set some additional custom app-specific properties
 message.Properties["MessageNumber"] = "1";
 message.Properties["TotalMsgsInSequence"] = "3";   
 //send the first message to the queue
 SeqQueue.Send(message1);
  
 //create the next message in the sequence. This message will be out of order.
 BrokeredMessage message3 = new BrokeredMessage("MessageSequence 3”);
 message.Properties["MessageNumber"] = "3";
 message.Properties["TotalMsgsInSequence"] = "3";   
 //send the first message to the queue
 SeqQueue.Send(message3);
 //create the last message in the sequence.
 BrokeredMessage message2 = new BrokeredMessage("MessageSequence 2”);
 message.Properties["MessageNumber"] = "2";
 message.Properties["TotalMsgsInSequence"] = "3";   
 //send the first message to the queue
 SeqQueue.Send(message2);
 

Now we have three messages in our queue that are out of order.  Next, let’s look at what is needed to receive the message and process them in order.  By the way, I will be utilizing the QueueClient object as this supports more advanced features (including sessions).

 private static void ReadMessagesFromQueue()
 {
 .... 
 // Read messages from queue until queue is empty: 
 
MessageReceiver Qreceiver = queueClient.CreateReceiver(); 
List<MessageReceipt> MessageReceiptsDeferred = new List<MessageReceipt>(); 
 
BrokeredMessage receivedMessage; 
while (Qreceiver.TryReceive(TimeSpan.FromSeconds(10), out receivedMessage)) 
{ 
     
   msgCount++;
  //check the received message and determine if it is in the sequence we need 
  if (receivedMessage.Properties["MessageNumber"].ToString() == msgCount) 
  { 
     
     ProcessEachMessage(receivedMessage); 
  } 
  else 
  { 
     receivedMessage.Defer();
      //store the message receipt for later 
     MessageReceiptsDeferred.Add(receivedMessage.MessageReceipt); 
  } 
} 
 
 
//we have now processed messages 1 and 2.  We need to go back and get message 3. 
foreach (MessageReceipt receipt in MessageReceiptsDeferred) 
{ 
      ProcessEachMessage(Qreceiver.Receive(receipt)); 
} 
 
 
Qreceiver.Close(); 
 
 
 }
 private static void ProcessEachMessage(BrokeredMessage message) 
{ 
         //This is where we do the work we need to on the messages (i.e., reassemble, send off to a LOB system, etc.)
          //Do your work here
        
         message.Complete(); 
} 
 

 

This example shows you how we can put this pattern into code.  As with all code examples, there is still work that needs to be done to this code if you are going to use this as a sample. 

As you think about putting this pattern into practice, it could be useful for situations in which you want to process messages that have high and low priority all on one queue.  You may not know what order the messages are submitted but you can receive all the messages, process the ones marked as high priority and defer the other submitted messages and then go back and process those messages after all the high priority messages have been drained.

 

Composed Composed Msg. Processor

This pattern provides guidance for message processing where there are a number of sub messages (i.e., order line items) that need to be routed to individual destinations, processed, returned and aggregated back into a single message.  This pattern is composed of a number of other patterns that include the Splitter, Content Based Router and Aggregator.

Because this pattern utilizes patterns already covered in this article I won’t duplicate the code as that can be found in each patterns implementation.  However, I will include a diagram that shows how these patterns can come together to create this complex pattern.

 

ComposedMessageProcessor

 

Scatter-Gather

This pattern outlines how a single message can be sent to many receivers, or a message may be split and its parts sent to many receivers so that one of many external providers may process the message.  Once the message has been processed and returned then the messages need to be aggregated and brought back into one message that will be processed.  Think of an order that could be fulfilled by a number of suppliers.  You can scatter the messages to those suppliers and gather all the responses and decide if you are going to take the first message returned and continue processing or if you analyze all the responses and take the lowest cost supplier.  Either way, once you have received all the messages you will take the message and send it on to the next step in the process.

To put this pattern in practice we can take implementations of other patterns.  The scatter portion can be implemented through the use of the Splitter pattern whereas the Gather portion can be implemented through the use of the Aggregator pattern.  Both of which are covered above.

 

ProcessMgr Process Manager

The process manager pattern outlines how you send a message through a number of processing steps that aren’t known at design time. These steps may not even be sequential.  In addition, the sequence of the processing steps may also be modified based on the results of the previously executed processing step.

To put this pattern into practice you can utilized Windows Workflow.  There are a number of blog articles showing how you can implement the currently released version of WF 4 in your code and deploy it to Azure.  In addition, there was a CTP of WF in Azure that was released last summer.  Lastly, there is also the Workflow Activity Pack for Windows Azure that allows you to run WF on-premises and interact with artifacts in Azure.