December 2012
Volume 27 Number 12
Azure Insider - Microsoft Azure Service Bus: Messaging Patterns Using Sessions
By Bruno Terkaly, Ricardo Villalobos | December 2012
In one of our previous articles, we discussed the importance of using messaging patterns in the cloud in order to decouple solutions and promote easy-to-scale software architectures. (See “Comparing Azure Queues and Service Bus Queues” at msdn.microsoft.com/magazine/jj159884.) Queuing is one of these messaging patterns, and the Azure platform offers two main options to implement this approach: Queue storage services and Service Bus Queues, both of which cover scenarios where multiple consumers compete to receive and process each of the messages in a queue. This is the canonical model for supporting variable workloads in the cloud, where receivers can be dynamically added or removed based on the size of the queue, offering a load balancing/failover mechanism for the back end (see Figure 1).
Figure 1 Queuing Messaging Pattern: Each Message Is Consumed by a Single Receiver
Even though the queuing messaging pattern is a great solution for simple decoupling, there are situations where each receiver requires its own copy of the message, with the option of discarding some messages based on specific rules. A good example of this type of scenario is shown in Figure 2, which illustrates a common challenge that retail companies face when sending information to multiple branches, such as the latest products catalog or an updated price list.
Figure 2 Publisher/Subscriber Messaging Pattern: Each Message Can Be Consumed More Than Once
For these situations, the publisher/subscriber pattern is a better fit, where the receivers simply express an interest in one or more message categories, connecting to an independent subscription that contains a copy of the message stream. The Azure Service Bus implements the publisher/subscriber messaging pattern through topics and subscriptions, which greatly enhances the ability to control how messages are distributed, based on independent rules and filters. In this article, we’ll explain how to apply these Azure Service Bus capabilities using a simple real-life scenario, assuming the following requirements:
- Products should be received in order, based on the catalog page.
- Some of the stores don’t carry specific catalog categories, and products in these categories should be filtered out for each store.
- New catalog information shouldn’t be applied to the store system until all the messages have arrived.
All the code samples for this article were created with Visual Studio 2012, using C# as the programming language. You’ll also need the Azure SDK version 1.8 for .NET developers and access to an Azure subscription.
Setting up the Messaging Blueprint for the Project
Before writing any code, you need to define the different entities (topics and subscriptions) that will become part of the messaging workflow. This can be accomplished by accessing the Azure Portal at manage.windowsazure.com. Log in with your credentials and follow these steps:
- Click the Create New icon on the bottom left of the Management Portal.
- Click on the APP SERVICES icon, then on SERVICE BUS TOPIC and finally on CUSTOM CREATE (see Figure 3).
- On the first dialog screen, enter the topic name and select the appropriate region and Azure subscription ID. If this is your first namespace in the selected region, the wizard will suggest a namespace queue: [your entity name]-ns. You can change this value.
- Click on the NEXT mark (right-pointing arrow) to insert the remaining properties. You can keep the default values. Click on the checkmark to create the topic.
- Click on the Service Bus icon on the left navigation bar to get a list of namespaces. Note that you might not see the namespace listed immediately. It takes a few seconds to create the namespace and update the portal interface.
- Select the Topic that you just created from the list, and click on ACCESS KEY, which can be found on the bottom of the screen. Record the full connection string for later use.
- On the top of the Azure Portal screen, click on SUBSCRIPTIONS, and then on CREATE A NEW SUBSCRIPTION. In the popup dialog, enter a name (in our example, we used “Store1Sub”), and click on the arrow to continue.
- In the next screen, keep the default values, but make sure to check the Enable sessions option. Click on the checkmark to create the subscription. Sessions will be used by subscribers to retrieve messages in sequential order.
- Repeat steps 7 and 8 for each of the three stores.
Figure 3 Creating a New Service Bus Topic Using the Azure Portal
Once the topics and subscriptions have been created, you can also access them directly in Visual Studio. To do so, open Server Explorer (View | Server Explorer) and expand the Azure Service Bus node (see Figure 4). Right-click on the Azure Service Bus node and select Add New Connection. Enter the Namespace name, Issuer Name (usually “owner”) and Issuer Access Key you recorded when the Azure namespace was created in the portal.
Figure 4 Creating a Service Bus Topic and Subscriptions Using the Visual Studio Tools
Keep in mind that it’s possible to programmatically create and manage these entities using classes in the Microsoft.ServiceBus.Messaging namespace, including TopicClient and SubscriptionClient, which are used later in this article.
Once the basic structure for the messaging workflow has been created, we’ll simulate traffic using two console applications created in Visual Studio, as shown in Figure 5. The first console application, MSDNSender, will send the products catalog. The second, MSDNReceiver, will receive the information in each of the stores. We’ll analyze the code in the following sections. In the Pub/Sub pattern, the MSDNSender is the publisher and the MSDNReceiver is the subscriber.
Figure 5 Visual Studio Solution to Simulate the Products Catalog Scenario
Sending the Products Catalog from Headquarters
As you can see in Figure 2, Headquarters (the publisher) sends messages to a topic. This logic is represented by the code in the main file, Program.cs, a part of the MSDNSender project. Program.cs encapsulates the logic and code to send a list of products as individual messages to the topic. Let’s take a look at the different sections, starting with the Main method. Notice that first we create a client for the topic, as follows:
// Create a topicClient using the
// Service Bus credentials
TopicClient topicClient =
TopicClient.CreateFromConnectionString(
serviceBusConnectionString, topicName);
Once a topicClient is created, the publisher can send messages using it. The list of products to be sent is stored in an XML file called ProductsCatalog.xml, which contains a list of 10 product entities that will be transformed into an array of objects. The products will then get mapped into the Catalog and Product classes stored in the Product.cs file:
// Deserialize XML file with Products, and store them in an object array
Catalog catalog = null;
string path = "ProductsCatalog.xml";
XmlSerializer serializer = new XmlSerializer(typeof(Catalog));
StreamReader reader = new StreamReader(path);
catalog = (Catalog) serializer.Deserialize(reader);
reader.Close();
Each Product in the catalog array presents the structure shown in Figure 6.
Figure 6 Class Representation for Products in the Catalog
public class Product
{
[System.Xml.Serialization.XmlElement("ProductId")]
public string ProductId { get; set; }
[System.Xml.Serialization.XmlElement("ProductName")]
public string ProductName { get; set; }
[System.Xml.Serialization.XmlElement("Category")]
public string Category { get; set; }
[System.Xml.Serialization.XmlElement("CatalogPage")]
public int CatalogPage { get; set; }
[System.Xml.Serialization.XmlElement("MSRP")]
public double MSRP { get; set; }
[System.Xml.Serialization.XmlElement("Store")]
public string Store { get; set; }
}
Inside the array loop, a call to the CreateMessage method extracts different properties from the Product objects and assigns them to the message to be sent. Two properties require extra attention:
if (isLastProductInArray)
message.Properties.Add("IsLastMessageInSession", "true");
message.SessionId = catalogName;
Sessions are extremely important, because they allow the receiver to determine whether all the messages that belong to a specific logical group have arrived. In this case, by setting the SessionId message property, we’re specifying that the receiver shouldn’t use the catalog information until after all the messages with the same catalogName value have arrived. Also, for the last product in the array, we’re adding a new property: IsLastMessageInSession, which will allow the receivers to determine if the last message in the session has arrived, and the catalog can be fully processed. Figure 7 shows MSDNSender running.
Figure 7 Execution of the MSDNSender Project
Receiving the Product Catalog Using Subscriptions at the Stores
Now that the catalog and products have been sent out to the topic and copied to the different subscriptions, let’s turn our attention to the MSDNReceiver project, where messages are received and processed. Note that in the Main method of Program.cs, the code creates a client for the Subscription based on information provided by the user via a Console.ReadLine command. Users are expected to enter their store number, which reflects the messages they wish to receive. In short, each branch store is concerned only with messages that apply to that store:
Console.WriteLine("Enter Store Number");
string storeNumber = Console.ReadLine();
Console.WriteLine("Selecting Subscription for Store...");
// Create a Subscription Client to the Topic
SubscriptionClient subscriptionClient =
SubscriptionClient.CreateFromConnectionString(
serviceBusConnectionString, topicName,
"Store" + storeNumber.Trim() + "Sub",
ReceiveMode.PeekLock);
Because we’re receiving messages from the subscriptions based on sessions (as explained in the previous section), we need to request the next one using the following line of code:
MessageSession sessionReceiver =
subscriptionClient.AcceptMessageSession(TimeSpan.FromSeconds(5));
Basically, what this means is that the client will check for any to-be-processed messages in the subscription—those whose SessionId property is not null—and if no such messages are encountered within a period of five seconds, the request will time out, terminating the receiver application. On the other hand, if a session is found, the ReceivingSessionMessages method will be called. Before we jump into this piece of code, let’s discuss the concept of session state, which allows the developer to store information that can be used while messages that belong to the same transaction are received. In this case, we’re using session state to “remember” the last catalog page that was received, as well as the messages—products—that arrived out of order.
Based on this, here’s the workflow in code:
- The current message is received in the ReceiveSessionMessages method (see Figure 8), which relies on the ProcessMessage method (Figure 9) to process it.
- Inside the ProcessMessage method, if the message is out of sequence, it’s automatically deferred and its ID stored in the session state. Otherwise, it’s marked as “complete”and removed from the subscription. Also, the next expected sequence—Catalog Page—is stored in the session.
- After the current received message has been processed, the subsequent code in ReceiveSessionMessages checks for deferred message IDs in the session, and tries to process them again based on the latest Catalog page.
- Once all the messages have been received for the session, the receiver is closed.
Figure 8 The ReceivedSessionMessages Method in Code
static void ReceiveSessionMessages(MessageSession receiver)
{
// Read messages from subscription until subscription is empty
Console.WriteLine("Reading messages from subscription {0}",
receiver.Path);
Console.WriteLine("Receiver Type:" + receiver.GetType().Name);
Console.WriteLine("Receiver.SessionId = " + receiver.SessionId);
SequenceState sessionState = GetState(receiver);
BrokeredMessage receivedMessage;
while ((receivedMessage = receiver.Receive()) != null)
{
string sessionId = receiver.SessionId;
ProcessMessage(receivedMessage, ref sessionState, receiver);
while (sessionState.GetNextOutOfSequenceMessage() != -1)
{
// Call back deferred messages
Console.WriteLine("Calling back for deferred message: Category {0},
Message sequence {1}", receiver.SessionId,
sessionState.GetNextSequenceId());
receivedMessage = receiver.Receive(
sessionState.GetNextOutOfSequenceMessage());
ProcessMessage(receivedMessage, ref sessionState, receiver);
}
if (receivedMessage.Properties.ContainsKey(
"IsLastMessageInSession"))
break;
}
SetState(receiver, null);
receiver.Close();
}
Figure 9 The ProcessMessage Method in Code
static void ProcessMessage(BrokeredMessage message, ref SequenceState sessionState,
MessageSession session = null)
{
if (session != null)
{
int messageId = Convert.ToInt32(message.Properties["CatalogPage"]);
if (sessionState.GetNextSequenceId() == messageId)
{
OutputMessageInfo("RECV: ", message, "State: " + "RECEIVED");
sessionState.SetNextSequenceId(messageId + 1);
message.Complete();
SetState(session, sessionState);
}
else
{
Console.WriteLine("Deferring message: Category {0}, Catalog Page {1}",
session.SessionId, messageId);
sessionState.AddOutOfSequenceMessage(messageId,
message.SequenceNumber);
message.Defer();
SetState(session, sessionState);
}
}
Thread.Sleep(receiverDelay);
}
Keep in mind that for this project, deferred message IDs are stored in the session state, and could be potentially lost. In a production environment, we recommend using some type of persisted storage (Azure Tables is one option) for this purpose. Note that if the message contains the property IsLastMessageSessionInSession (set during the sending process), the session loop is terminated. The console output for the MSDNReceiver project can be seen in Figure 10.
Figure 10 Execution of the MSDNReceiver project
Azure Service Bus subscriptions give you the ability to create specific rules that filter out messages before they’re consumed. In this case, it would be relatively easy to create a rule that segregates products by category or by store number (which we ignored in this project). Rules can be created programmatically, directly in the Azure portal or through Visual Studio tools.
Wrapping Up
The Azure Service Bus offers an amazingly robust and flexible implementation of the publish/subscribe pattern. Many different scenarios can be addressed through the use of topics and subscriptions. The ability to support multiple senders broadcasting messages to multiple receivers, combined with the ability to logically group and sort messages, opens up a world of possibilities for the modern developer. Moreover, being able to leverage a persistent session to track state makes it straightforward to logically group messages and control their sequence. In a world where distributed environments are the norm, understanding how to use messaging patterns and the tools around them is crucial for today’s software architects working in the cloud.
Bruno Terkaly is a developer evangelist for Microsoft. His depth of knowledge comes from years of experience in the field, writing code using a multitude of platforms, languages, frameworks, SDKs, libraries and APIs. He spends time writing code, blogging and giving live presentations on building cloud-based applications, specifically using the Azure platform.
Ricardo Villalobos is a seasoned software architect with more than 15 years of experience designing and creating applications for companies in the supply chain management industry. Holding different technical certifications, as well as a master’s degree in business administration from the University of Dallas, he works as a cloud architect in the Azure CSV incubation group for Microsoft.
Thanks to the following technical expert for reviewing this article: Abhishek Lal