Partager via


An Introduction to Service Bus Topics

In the May CTP of Service Bus, we’ve added a brand-new set of cloud-based, message-oriented-middleware technologies including reliable message queuing and durable publish/subscribe messaging. Last week I posted the Introduction to Service Bus Queues blog entry. This post follows on from that and provides an introduction to the publish/subscribe capabilities offered by Service Bus Topics. Again, I’m not going to cover all the features in this article, I just want to give you enough information to get started with the new feature. We’ll have follow-up posts that drill into some of the details.

I’m going to continue with the retail scenario that I started in the queues blog post. Recall that sales data from individual Point of Sale (POS) terminals needs to be routed to an inventory management system which uses that data to determine when stock needs to be replenished. Each POS terminal reports its sales data by sending messages to the DataCollectionQueue where they sit until they are received by the inventory management system as shown below:

Now let’s evolve this scenario. A new requirement has been added to the system: the store owner wants to be able to monitor how the store is performing in real-time.

To address this requirement we need to take a “tap” off the sales data stream. We still want each message sent by the POS terminals to be sent to the Inventory Management System as before but we want another copy of each message that we can use to present the dashboard view to the store owner.

In any situation like this, where you need each message to be consumed by multiple parties, you need the Service Bus Topic feature. Topics provide the publish/subscribe pattern in which each published message is made available to each subscription registered with the Topic. Contrast this with the queue where each message is consumed by a single consumer. That’s the key difference between the two models.

Messages are sent to a topic in exactly the same way as they are sent to a queue but messages aren’t received from the topic directly, instead they are received from subscriptions. You can think of a topic subscription like a virtual queue that gets copies of the messages that are sent to the topic. Messages are received from a subscription in exactly the same way as they are received from a queue.

So, going back to the scenario, the first thing to do is to switch out the queue for a topic and add a subscription that will be used by the Inventory Management System. So, the system would now look like this:

The above configuration would perform identically to the previous queue-based design. That is, messages sent to the topic would be routed to the Inventory subscription from where the Inventory Management System would consume them.

Now, in order to support the management dashboard, we need to create a second subscription on the topic as shown below:

Now, with the above configuration, each message from the POS terminals will be made available to both the Dashboard and Inventory subscriptions.


Show Me the Code

I described how to sign-up for a Service Bus account and create a namespace in the queues blog post so I won’t cover that again here. Recall that to use the Service Bus namespace, an application needs to reference the AppFabric Service Bus DLLs, namely Microsoft.ServiceBus.dll and Microsoft.ServiceBus.Messaging.dll. You can find these as part of the SDK download.


Creating the Topic and Subscriptions

Management operations for Service Bus messaging entities (queues and topics) are performed via the ServiceBusNamespaceClient which is constructed with the base address of the Service Bus namespace and the user credentials. The ServiceBusNamespaceClient provides methods to create, enumerate and delete messaging entities. The snippet below shows how the ServiceBusNamespaceClient is used to create the DataCollectionTopic.

 

    Uri ServiceBusEnvironment.CreateServiceUri("sb", "ingham-blog", string.Empty);

    string name = "owner";

    string key = "abcdefghijklmopqrstuvwxyz";

  

    ServiceBusNamespaceClient namespaceClient = new ServiceBusNamespaceClient(

        baseAddress, TransportClientCredentialBase.CreateSharedSecretCredential(name, key) );

 

     Topic dataCollectionTopic = namespaceClient.CreateTopic("DataCollectionTopic");
  

Note that there are overloads of the CreateTopic method that allow properties of the topic to be tuned, for example, to set the default time-to-live to be applied to messages sent to the topic. Next, let’s add the Inventory and Dashboard subscriptions.

     dataCollectionTopic.AddSubscription("Inventory");
     dataCollectionTopic.AddSubscription("Dashboard");
  

Sending Messages to the Topic

As I mentioned earlier, applications send messages to a topic in the same way that they send to a queue so the code below will look very familiar if you read the queues blog post. The difference is the application creates a TopicClient instead of a QueueClient.

For runtime operations on Service Bus entities, i.e., sending and receiving messages, an application first needs to create a MessagingFactory. The base address of the ServiceBus namespace and the user credentials are required.

    Uri ServiceBusEnvironment.CreateServiceUri("sb", "ingham-blog", string.Empty);

    string name = "owner";

    string key = "abcdefghijklmopqrstuvwxyz";

 

    MessagingFactory factory = MessagingFactory.Create(

        baseAddress, TransportClientCredentialBase.CreateSharedSecretCredential(name, key) );

 

From the factory, a TopicClient is created for the particular topic of interest, in our case, the DataCollectionTopic.

 

    TopicClient topicClient = factory.CreateTopicClient("DataCollectionTopic");

 

A MessageSender is created from the TopicClient to perform the send operations.

 

    MessageSender ms = topicClient.CreateSender();

 

Messages sent to, and received from, Service Bus topics (and queues) are instances of the BrokeredMessage class which consists of a set of standard properties (such as Label and TimeToLive), a dictionary that is used to hold application properties, and a body of arbitrary application data. An application can set the body by passing in any serializable object into CreateMessage (the example below passes in a SalesData object representing the sales data from the POS terminal) which will use the DataContractSerializer to serialize the object. Alternatively, a System.IO.Stream can be provided.

 

    BrokeredMessage bm = BrokeredMessage.CreateMessage(salesData);

    bm.Label = "SalesReport";

    bm.Properties["StoreName"] = "Redmond";

    bm.Properties["MachineID"] = "POS_1";

 

    ms.Send(bm);


Receiving Messages from a Subscription

Just like when using queues, messages are received from a subscription using a MessageReceiver. The difference is that the MessageReceiver is created from a SubscriptionClient rather than a QueueClient. Everything else remains the same including support for the two different receive modes (ReceiveAndDelete and PeekLock) that I discussed in the queues blog post.

So, first we create the SubscriptionClient, passing the name of the topic and the name of the subscription as parameters. Here I’m using the Inventory subscription.

     SubscriptionClient subClient = factory.CreateSubscriptionClient("DataCollectionTopic", "Inventory");
  

Next we create the MessageReceiver and receive a message.

    MessageReceiver mr = subClient.CreateReceiver();

    BrokeredMessage receivedMessage = mr.Receive();

    try

    {

        ProcessMessage(receivedMessage);

        receivedMessage.Complete();

    }

    catch (Exception e)

    {

        receivedMessage.Abandon();

    }


Subscription Filters

So far, I’ve said that all messages sent to the topic are made available to all registered subscriptions. The key phrase there is “made available”. While Service Bus subscriptions see all messages sent to the topic, it is possible to only copy a subset of those messages to the virtual subscription queue. This is done using subscription filters. When a subscription is created, it’s possible to supply a filter expression in the form of a SQL92 style predicate that can operate over the properties of the message, both the system properties (e.g., Label) and the application properties, such as StoreName in the above example.

Let’s evolve the scenario a little to illustrate this. A second store is to be added to our retail scenario. Sales data from all of the POS terminals from both stores still need to be routed to the centralized Inventory Management System but a store manager using the Dashboard tool is only interested in the performance of her store. We can use subscription filtering to achieve this. Note that when the POS terminals publish messages, they set the StoreName application property on the message. Now we have two stores, let’s say Redmond and Seattle, the POS terminals in the Redmond store stamp their sales data messages with a StoreName of Redmond while the Seattle store POS terminals use a StoreName of Seattle. The store manager of the Redmond store only wishes to see data from its POS terminals. Here’s how the system would look:

To set this routing up, we need to make a simple change to how we’re creating the Dashboard subscription as follows:

     dataCollectionTopic.AddSubscription("Dashboard", new SqlFilterExpression("StoreName = 'Redmond'");
  

With this subscription filter in place, only messages with the StoreName property set to Redmond will be copied to the virtual queue for the Dashboard subscription.

There is a bigger story to tell around subscription filtering. Applications have an option to have multiple filter rules per subscription and there’s also the ability to modify the properties of a message as it passes in to a subscription’s virtual queue. We’ll cover these advanced topics in a separate blog post.


Wrapping up

Hopefully this post has shown you how to get started with the topic-based publish/subscribe feature being introduced in the new May CTP of Service Bus.

It’s worth noting that all of the reasons for using queuing that I mentioned in the introduction to queuing blog post also apply to topics, namely:

  • Temporal decoupling – message producers and consumers do not have to be online at the same time.
  • Load leveling – peaks in load are smoothed out by the topic allowing consuming applications to be provisioned for average load rather than peak load.
  • Load balancing – just like with a queue, it’s possible to have multiple competing consumers listening on a single subscription with each message being handed off to only one of the consumers, thereby balancing load.
  • Loose coupling – it’s possible to evolve the messaging network without impacting existing endpoints, e.g., adding subscriptions or changing filters to a topic to accommodate new consumers.

We’ve only really just scratched the surface here; we’ll go in to more depth in future posts.

Finally, remember one of the main goals of our CTP release is to get feedback on the service. We’re interested to hear what you think of the Service Bus messaging features. We’re particularly keen to get your opinion of the API. So, if you have suggestions, critique, praise, or questions, please let us know at https://social.msdn.microsoft.com/Forums/en-US/appfabricctp/. Your feedback will help us improve the service for you and other users like you.

Comments

  • Anonymous
    June 14, 2011
    Nice post. I love the way the AppFabric is developing!Interesting that you chose to use SQL-like expressions. Is there any reason why you didn't go for a lambda expression approach to take advantage of strong typing?
  • Anonymous
    June 14, 2011
    The SQL expressions are, in fact, compiled down into LINQ expressions. We just haven't made that available, yet. SQL expressions are the canonical filter language in most pub/sub systems so we gave priority to that style.
  • Anonymous
    September 29, 2011
    Nice Post!Is it possible to specify additional metadata on a subscrption?I'd like to be able to store information like who created the subscription etc...Also the management portal doesn't allow you to specify any filters. Is it only supported via the API?
  • Anonymous
    October 31, 2011
    Good One !!