WCF Essentials
What You Need To Know About One-Way Calls, Callbacks, And Events
Juval Lowy
This article is based on a prerelease version of the .NET Framework 3.0. All information herein is subject to change.
This article discusses:
|
This article uses the following technologies: .NET Framework 3.0 |
Code download available at:WCF2006_10.exe(345 KB)
Contents
One-Way Operations
Callback Operations
Client Callback Setup
Callback Reentrancy
Events
The Publish-Subscribe Framework
Managing Persistent Subscribers
Event Publishing
Administering Persistent Subscribers
Queued Publishers and Subscribers
Conclusion
T he classic object and component-oriented programming models offer only a single way for clients to call a method: the client issues a call, blocks while the call is in progress, and then continues executing once the method returns. Windows® Communication Foundation (WCF) supports this classic invocation model, but it also provides built-in support for two additional operation types: one-way calls for fire-and-forget operations and duplex callbacks that let the service call back to the client.
By default, Windows Communication Foundation operations are of the type known as request-reply: the client issues a request in the form of a message and blocks until it gets the reply message. If the service does not respond within a default timeout of one minute, the client will get a TimeoutException. In addition, the proxy will throw an exception on the client side if there were any communication or service-side exceptions. With the exceptions of NetPeerTcpBinding and NetMsmqBinding, all bindings support request-reply operations.
I'll start here by presenting the ways of invoking and calling Windows Communication Foundation operations and the related design guidelines, then illustrate using these operation types as the building blocks of a custom framework for publishing and subscribing to events. In the process, I'll show you a variety of advanced Microsoft® .NET Framework and Windows Communication Foundation programming techniques.
One-Way Operations
There are cases when an operation has no returned values and the client does not care about the success or failure of the invocation. To support this sort of fire-and-forget invocation, Windows Communication Foundation offers one-way operations. After the client issues the call, Windows Communication Foundation generates a request message, but no correlated reply message will ever return to the client. As a result, one-way operations can't return values, and any exception thrown on the service side will not make its way to the client. One-way calls do not equate to asynchronous calls. When one-way calls reach the service, they may not be dispatched all at once and may be queued up on the service side to be dispatched one at a time, all according to the service configured concurrency mode behavior and session mode. How many messages (whether one-way or request-reply) the service is willing to queue up is a product of the configured channel and the reliability mode. If the number of queued messages has exceeded the queue's capacity, then the client will block, even when issuing a one-way call. However, once the call is queued, the client is unblocked and can continue executing while the service processes the operation in the background. This usually gives the appearance of asynchronous calls. All the Windows Communication Foundation bindings support one-way operations.
The OperationContract attribute offers the Boolean IsOneWay property, which defaults to false, meaning a request-reply operation. But setting IsOneWay to true configures the method as a one-way operation:
[ServiceContract] interface IMyContract { [OperationContract(IsOneWay = true)] void MyMethod() }
Because there is no reply associated with a one-way operation, there's no point in having any returned values or results, and the operation must have a void return type without any outgoing parameters. Since Windows Communication Foundation does not have any hooks into the language compiler and can't verify proper usage at compile-time, it enforces this by verifying the method signature when loading up the host at run time and throwing an InvalidOperationException in case of a mismatch.
The fact that the client doesn't care about the result of the invocation does not mean the client doesn't care if the invocation took place at all. In general, you should turn on reliability for your services, even for one-way calls. This will ensure delivery of the requests to the service. However, with one-way calls, the client may or may not care about the invocation order of the one-way operations. This is one of the main reasons why Windows Communication Foundation allows you to separate enabling reliable delivery from enabling ordered delivery and execution.
Callback Operations
Windows Communication Foundation supports allowing the service to call back to its clients. During a callback, in many respects the tables are turned: the service is the client and the client becomes the service (see Figure 1).
Figure 1** A Callback Operation **
The client also has to facilitate hosting the callback object. Not all bindings support callback operations. Because of its connectionless nature, HTTP can't be used for callbacks and therefore you can't use callbacks over BasicHttpBinding or WSHttpBinding. Windows Communication Foundation offers callback support for NetTcpBinding and NetNamedPipeBinding because the underlying transport is bidirectional. To support callbacks over HTTP, Windows Communication Foundation provides WSDualHttpBinding, which actually sets up two HTTP channels: one for the calls from the client to the service and one for the calls from the service to the client.
A service contract can have at most one callback contract. Once the callback contract is defined, the clients are required to support the callback and also to provide the callback endpoint to the service in every call. The ServiceContract attribute offers the CallbackContract property of the type Type. You need to set it to the callback contract type and provide the definition of the callback contract, as shown here:
interface IMyContractCallback { [OperationContract] void OnCallback(); } [ServiceContract(CallbackContract = typeof(IMyContractCallback))] interface IMyContract { [OperationContract] void DoSomething(); }
Note that the callback contract need not be marked with a ServiceContract attribute—it is implied.
The client-side imported callback interface will not necessarily have the same name as in the original service-side definition. Instead, it will be have the name of the service contract interface suffixed with the word Callback.
Client Callback Setup
It is up to the client to host the callback object and expose a callback endpoint. The innermost execution scope of the service instance is the instance context:
public sealed class InstanceContext : CommunicationObject, ... { public InstanceContext(object implementation); ... // More members }
All the client needs to do to host a callback object is to instantiate the callback object and construct a context around it:
class MyCallback : IMyContractCallback { public void OnCallback() {...} } IMyContractCallback callback = new MyCallback(); InstanceContext context = new InstanceContext(callback);
Whenever interacting with a service endpoint whose contract defines a callback contract, the client must use a proxy that will set up the bidirectional communication and pass the callback endpoint reference to the service. The proxy the client uses must derive from the specialized proxy class DuplexClientBase<T> shown in Figure 2.
Figure 2 DuplexClientBase
public interface IDuplexContextChannel : IContextChannel { InstanceContext CallbackInstance { get; set; } ... // More members } public class DuplexClientBase<T> : ClientBase<T> where T : class { protected DuplexClientBase(InstanceContext callbackContext); protected DuplexClientBase(InstanceContext callbackContext, string endpointName); public IDuplexContextChannel InnerDuplexChannel { get; } ... // More members }
The client provides the constructor of DuplexClientBase<T> with the instance context hosting the callback object (as well as the service endpoint information as with a regular proxy). The proxy will construct an endpoint around the callback context, while inferring the details of the callback endpoint from the service endpoint configuration. The callback endpoint contract is the one defined by the service contract callback type. The callback endpoint will use the same binding (or transport, actually) as the outgoing call. Windows Communication Foundation will use the client's machine name for the address and even select a port when using HTTP. Simply by passing the instance context to the duplex proxy and using the proxy to call the service will expose that client-side callback endpoint.
When using SvcUtil or Visual Studio® 2005 to generate a proxy class, the tools will generate a class derived from DuplexClientBase<T> as shown in Figure 3. The client constructs a callback instance, hosts it in a context, creates a proxy and call the service, thus passing the callback endpoint reference:
class MyCallback : IMyContractCallback { public void OnCallback() {...} } IMyContractCallback callback = new MyCallback(); InstanceContext context = new InstanceContext(callback); MyContractClient proxy = new MyContractClient(context); proxy.DoSomething();
Figure 3 Tool-Generated Duplex Proxy
partial class MyContractClient : DuplexClientBase<IMyContract>,IMyContract { public MyContractClient(InstanceContext inputInstance) : base(inputInstance) {} public MyContractClient(InstanceContext inputInstance, string endpointName) : base(inputInstance,endpointName) {} ... // More constructors public void DoSomething() { Channel.DoSomething(); } }
Note that as long as the client is expecting callbacks, it can't close the proxy. Doing so would close the callback endpoint and cause an error on the service side when the service tried to call back. It is often the case that the client itself implements the callback contract, in which case the client will typically use a member variable for the proxy and close it when the client is disposed, as shown in Figure 4.
Figure 4 Client Implementing Callback Contract
class MyClient : IMyContractCallback, IDisposable { MyContractClient m_Proxy; public void CallService() { InstanceContext context = new InstanceContext(this); m_Proxy = new MyContractClient(context); m_Proxy.DoSomething(); } public void OnCallback() {...} public void Dispose() { m_Proxy.Close(); } }
The client-side callback endpoint reference is passed along with every call the client makes to the service and is part of the incoming message. The OperationContext class provides the service with easy access to the callback reference via the generic method GetCallbackChannel<T>. What exactly the service does with the callback reference and when it decides to use the reference is completely at the discretion of the service. The service can extract the callback reference from the operation context and store it for later use or the service can use the reference during the service operation to call back to the client. Figure 5 demonstrates the first option.
Figure 5 Storing the Callback References for Later Use
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MyService : IMyContract { static List<IMyContractCallback> m_Callbacks = new List<IMyContractCallback>(); public void DoSomething() { IMyContractCallback callback = OperationContext.Current.GetCallbackChannel< IMyContractCallback>(); if(!m_Callbacks.Contains(callback)) { m_Callbacks.Add(callback); } } public static void CallClients() { m_Callbacks.ForEach(delegate(IMyContractCallback callback) { callback.OnCallback(); }); } }
Using the same definition of the callback contract as shown earlier, the service uses a static generic list to store references to interfaces of the type IMyContractCallback. Because the service does not know which client is calling it and whether the client has called it already, the service checks in every call to see whether the list already contains the callback reference. If the list does not contain the reference, the service adds the callback to the list. The service class also offers the static method CallClients. Any party on the host side can simply use that to call back to the clients:
MyService.CallClients();
Invoked this way, the invoking party is using some host-side thread for the callback invocation. That thread is unrelated to any thread executing an incoming service call.
Callback Reentrancy
The service may also want to invoke the callback reference passed in or invoke the list of callbacks during the execution of a contract operation. However, such invocations are disallowed because by default the service class is configured for single-threaded access: the service instance is associated with a lock, and only one thread at a time can own the lock and access the service instance. Calling out to the clients during an operation requires blocking the service thread while invoking the callbacks. The problem is that processing the reply message from the client once the callback returns requires ownership of the same lock, and so a deadlock would occur.
To avoid a deadlock, if the single-threaded service instance tries to call back to its clients, Windows Communication Foundation will throw an InvalidOperationException. There are three possible solutions. The first is to configure the service for multiple-threaded access, which would not associate it with a lock and would therefore allow callbacks. However, this would also increase the burden on the service developer because of the need to provide synchronization for the service.
The second solution is to configure the service for reentrancy. When configured for reentrancy, the service instance is still associated with a lock, and only a single-threaded access is allowed. However, if the service is calling back to its clients, Windows Communication Foundation will silently release the lock first.
The following code demonstrates a service configured for reentrancy. During the operation execution, the service reaches into the operation context, grabs the callback reference, and invokes it. Control will only return to the service after the callback returns, and the service's own thread will need to reacquire the lock:
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)] class MyService : IMyContract { public void DoSomething() { IMyContractCa llback callback = OperationContext.Current. GetCallbackChannel<IMyContractCallback>(); callback.OnCallback(); } }
The third solution that allows the service to safely call back to the client is to have the callback contract operations configured as one-way operations. Doing so enables the service to call back even when concurrency is set to single-threaded, because there will not be any reply message to contend for the lock.
Events
The canonical use for duplex callbacks is with events. Events allow clients to be notified about something that occurred on the service side. The event may result from a direct client call or it may be the result of something the service monitors. The service firing the event is called the publisher, and the client receiving the event is called the subscriber, as shown in Figure 6.
Figure 6** Publishers and Subscribers **
Although events in Windows Communication Foundation are nothing more than callback operations, by their very nature events usually imply a looser relationship between the publisher and the subscriber than between a client and a service. When dealing with events, the service typically publishes the same event to multiple subscribers. The publisher often does not care about the order of the invocation of the subscribers or about errors the subscribers might have while processing the events. In addition, the service does not care about returned results from the subscribers. Consequently, event-handling operations should have a void return type, should not have any out-going parameters and should be marked as one-way. I also recommend factoring the events to a separate callback contract and not mixing events with regular callbacks on the same contract:
interface IMyEvents { [OperationContract(IsOneWay = true)] void OnEvent1(); [OperationContract(IsOneWay = true)] void OnEvent2(int number); [OperationContract(IsOneWay = true)] void OnEvent3(int number, string text); }
However, using raw duplex callbacks for events often introduces too much coupling between the publisher and the subscribers. The subscriber has to know where all the publishing services are in the application and connect to them. Any publisher that the subscriber is unaware of will not be able to notify the subscriber of events. This in turn makes adding new subscribers (or removing existing ones) difficult in an already-deployed application. There is no way for the subscriber to ask to be notified whenever anyone in the application raises a particular type of event. In addition, the subscriber must make multiple, potentially expensive calls to each publisher both to subscribe and to unsubscribe. Different publishers may fire the same event, but offer slightly different ways to subscribe and unsubscribe, which of course couples the subscribers to those methods.
Much the same way, the publisher can only notify subscribers it knows about. There is no way for the publisher to deliver an event to whoever wishes to receive it, nor is there an ability to broadcast an event. Moreover, all publishers must have the necessary code to manage the list of the subscribers and the publishing act itself. This code has almost nothing to do with the business problem the service is designed to solve and can get fairly complex if you want to employ advanced features such as concurrent publishing.
Further, the duplex-based callbacks introduce coupling between the lifeline of the publisher and the subscribers. The subscribers have to be up and running in order to subscribe and receive events. There is no way for a subscriber to ask that if an event is fired, the application should create an instance of the subscriber and let it handle the event.
Finally, setting up subscriptions has to be done programmatically. There is no easy administrative way to configure subscriptions in the application or to change the subscriber's preferences when the system is running.
The solution to these problems is to design around them using the publish-subscribe design pattern and decouple the publishers from the subscribers by introducing a dedicated subscription service and a dedicated publishing service in between, as shown in Figure 7.
Figure 7** Publish-Subscribe System **
Subscribers that want to subscribe to events register with the subscription service, which manages the lists of subscribers and also provides a similar ability to unsubscribe. Similarly, all publishers use the publisher service to fire their events and avoid delivering the events directly to the subscribers. The subscription and publishing services provide a layer of indirection that decouples your system. No longer do the subscribers have any knowledge about the identity of the publishers. They can subscribe to a type of an event and will receive the event from any publisher, since the subscription mechanism is uniform across all publishers. In fact, no publisher has to manage any subscription list and the publishers have no idea who the subscribers are. They deliver the event to the publishing service to be supplied to any interested subscriber.
You can even define two types of subscribers: transient subscribers are in-memory, running subscribers, and persistent subscribers are subscribers that persist on the disk, representing services to invoke when the event takes place. For transient subscribers you can use the duplex callback mechanism as a handy way of passing the callback reference to the running service. For the persistent subscribers, all you need to record is the subscriber address as reference. When the event is raised, the publishing service will call to the persistent subscriber address and deliver the event. Another important distinction between the two types of subscriptions is that you can store the persistent subscription on the disk or in a database. Doing so will persist the subscription across application shutdowns or machine crashes and restarts, thus enabling administrative configuration of the subscriptions. Obviously, you can't save the transient subscription across an application shutdown and you'll need to set up the transient subscription explicitly every time the application starts.
The Publish-Subscribe Framework
The source code available with this article contains a complete publish-subscribe example. In this code, I wanted to provide not just sample publish-subscribe services and clients but also a general-purpose framework that automates implementing such services and adding the support for any application. The first step in building the framework was to factor the publish-subscribe management interfaces and to provide separate contracts for publishing for transient and persistent subscriptions.
For managing transient subscriptions, I defined the ISubscriptionService interface:
[ServiceContract] public interface ISubscriptionService { [OperationContract] void Subscribe(string eventOperation); [OperationContract] void Unsubscribe(string eventOperation); }
Note that ISubscriptionService does not identify the callback contract expected by its implementing endpoint. The callback interface is provided in the application by deriving from ISubscriptionService and specifying the desired callback contract:
[ServiceContract(CallbackContract = typeof(IMyEvents))] interface IMySubscriptionService : ISubscriptionService {}
The sub interface of ISubscriptionService does not need to add operations. The transient subscription management functionality is provided by ISubscriptionService. In each call to Subscribe or Unsubscribe, the subscriber needs to provide the name of the operation (the event) it wants to subscribe to or unsubscribe from. If the caller wishes to subscribe to or unsubscribe from all events, it can pass an empty or null string.
My framework offers an implementation for the methods of ISubscriptionService in the generic abstract class SubscriptionManager<T>:
public abstract class SubscriptionManager<T> where T : class { public void Subscribe(string eventOperation); public void Unsubscribe(string eventOperation); ... // More members }
The generic type parameter for SubscriptionManager<T> is the events contract. Note that SubscriptionManager<T> does not derive from ISubscriptionService.
The application needs to expose its own transient subscription service in the form of an endpoint that supports its specific sub interface of ISubscriptionService. To do this, the application needs to provide a service class that derives from SubscriptionManager<T>, specify the callback contract as a type parameter and derive from the specific sub interface of ISubscriptionService:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents>,IMySubscriptionService {}
There is no need for any code in MySubscriptionService because IMySubscriptionService does not add any new operations and SubscriptionManager<T> already implements the methods of ISubscriptionService.
Finally, the applications need to define an endpoint for IMySubscriptionService:
<services> <service name ="MySubscriptionService"> <endpoint address = "..." binding = "..." contract= "IMySubscriptionService" /> </service> </services>
Figure 8 shows how SubscriptionManager<T> manages transient subscriptions. SubscriptionManager<T> stores the transient subscribers in a static generic dictionary called m_TransientStore:
static Dictionary<string,List<T>> m_TransientStore;
Each entry in the dictionary contains the name of the event operation and all its subscribers in the form of a linked list. The static constructor of SubscriptionManager<T> uses reflection to get all the operations of the callback interfaces (the type parameter for SubscriptionManager<T>) and initializes the dictionary to have all the operations with empty lists. The Subscribe method extracts from the operation call context the callback reference. If the caller specifies an operation name, Subscribe calls the helper method AddTransient, which retrieves from the store the list of subscribers for the event. If the list does not contain the subscriber, AddTransient adds it.
Figure 8 TransientSubscribersManagement
public abstract class SubscriptionManager<T> where T : class { static Dictionary<string,List<T>> m_TransientStore; static SubscriptionManager() { m_TransientStore = new Dictionary<string,List<T>>(); string[] methods = GetOperations(); Array.ForEach(methods, delegate(string methodName) { m_TransientStore.Add(methodName, new List<T>()); }); } static string[] GetOperations() { MethodInfo[] methods = typeof(T).GetMethods( BindingFlags.Public | BindingFlags.FlattenHierarchy| BindingFlags.Instance); List<string> operations = new List<string>(methods.Length); Array.ForEach(methods, delegate(MethodInfo method) { operations.Add(method.Name); }); return operations.ToArray(); } static void AddTransient(T subscriber,string eventOperation) { List<T> list = m_TransientStore[eventOperation]; if(list.Contains(subscriber)) return; list.Add(subscriber); } static void RemoveTransient(T subscriber, string eventOperation) { List<T> list = m_TransientStore[eventOperation]; list.Remove(subscriber); } public void Subscribe(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { T subscriber = OperationContext.Current.GetCallbackChannel<T>(); if(!String.IsNullOrEmpty(eventOperation)) { AddTransient(subscriber,eventOperation); } else { string[] methods = GetOperations(); Array.ForEach(methods, delegate(string methodName) { AddTransient(subscriber,methodName); }); } } } public void Unsubscribe(string eventOperation) { lock(typeof(SubscribtionManager<T>)) { T subscriber = OperationContext.Current.GetCallbackChannel<T>(); if(!String.IsNullOrEmpty(eventOperation)) { RemoveTransient(subscriber,eventOperation); } else { string[] methods = GetOperations(); Array.ForEach(methods, delegate(string methodName) { RemoveTransient(subscriber,methodName); }); } } } ... // More members }
If the caller specified an empty string or null for the operation name, Subscribe calls AddTransient for each operation in the callback contract. Unsubscribe operates in a similar manner. Note that the caller can subscribe to all events and then unsubscribe from a particular one.
Managing Persistent Subscribers
For managing persistent subscribers, I defined the IPersistentSubscriptionService interface shown in Figure 9.
Figure 9 IPersistentSubscriptionService
[ServiceContract] public interface IPersistentSubscriptionService { [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void PersistSubscribe( string address, string eventsContract, string eventOperation); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] void PersistUnsubscribe( string address, string eventsContract, string eventOperation); ... // More members }
To add a persistent subscriber, the caller needs to call PersistSubscribe and provide the address of the subscriber, the events contract name, and the specific event operation itself. To unsubscribe, the caller uses PersistUnsubscribe with the same information. Note that IPersistentSubscriptionService does not imply in any way where the subscribers persist on the service side—that is an implementation detail.
The class SubscriptionManager<T> presented previously also implements the methods of IPersistentSubscriptionService:
public abstract class SubscriptionManager<T> where T : class { public void PersistUnsubscribe( string address, string eventsContract, string eventOperation); public void PersistSubscribe( string address, string eventsContract, string eventOperation); ... // More members }
SubscriptionManager<T> stores the persistent subscribers in SQL Server™. Note that SubscriptionManager<T> does not derive from IPersistentSubscriptionService. The using application needs to expose its own persistent subscription service, but there is no need to derive a new contract from IPersistentSubscriptionService because no callback references are required. The application simply derives from SubscriptionManager<T>, specifying the events contract as a type parameter and adding a derivation from IPersistentSubscriptionService:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscribtionService : SubscriptionManager<IMyEvents>, IPersistentSubscriptionService {}
There is no need for any code in MySubscriptionService because SubscriptionManager<T> already implements the methods of IPersistentSubscriptionService.
Finally, the application needs to define an endpoint for IPersistentSubscriptionService:
<services> <service name ="MySubscriptionService"> <endpoint address = "..." binding = "..." contract= "IPersistentSubscriptionService" /> </service> </services>
The implementation of the methods of IPersistentSubscriptionService by SubscriptionManager<T> is shown in Figure 10, which is similar to Figure 8 except the subscribers are stored in SQL Server, not in-memory in a dictionary.
Figure 10 Persistent Subscribers Management
public abstract class SubscriptionManager<T> where T : class { static void AddPersistent(string address,string eventsContract, string eventOperation) { ... // Uses ADO.NET to store the subscription in SQL Server } static void RemovePersistent(string address,string eventsContract, string eventOperation) { ... // Uses ADO.NET to remove the subscription from SQL Server } [OperationBehavior(TransactionScopeRequired = true)] public void PersistUnsubscribe(string address,string eventsContract, string eventOperation) { if(!String.IsNullOrEmpty(eventOperation)) { RemovePersistent(address,eventsContract,eventOperation); } else { string[] methods = GetOperations(); Array.ForEach(methods, delegate(string methodName) { RemovePersistent(address,eventsContract,methodName); }); } } [OperationBehavior(TransactionScopeRequired = true)] public void PersistSubscribe(string address,string eventsContract, string eventOperation) { if(!String.IsNullOrEmpty(eventOperation)) { AddPersistent(address,eventsContract,eventOperation); } else { string[] methods = GetOperations(); Array.ForEach(methods, delegate(string methodName) { AddPersistent(address,eventsContract,methodName); }); } } // More members }
If the application needs to support both transient and persistent subscribers for the same events contract, simply derive the subscription service class from both the specialized sub interface of ISubscriptionService and from IPersistentSubscriptionService and expose the two matching endpoints:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MySubscriptionService : SubscriptionManager<IMyEvents>, IMySubscriptionService,IPersistentSubscriptionService {}
Event Publishing
My framework also enables easy implementation of the publishing service, which should support the same events contract as the subscribers and should be the only point of contact known to the publishers in the application. Because the publishing service exposes the events contract in an endpoint, you need to mark the events contract as service contract, even if you only use it for duplex callbacks with transient subscribers:
[ServiceContract] interface IMyEvents {...}
The publish-subscribe framework contains the helper class PublishService<T> defined as:
public abstract class PublishService<T> where T : class { protected static void FireEvent(params object[] args); }
PublishService<T> requires as a type parameter the type of the events contract. To provide your own publishing service, derive from PublishService<T> and use the FireEvent method to deliver the event to all subscribers, whether transient or persistent:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)] class MyPublishService : PublishService<IMyEvents>,IMyEvents { public void OnEvent1() { FireEvent(); } public void OnEvent2(int number) { FireEvent(number); } public void OnEvent3(int number,string text) { FireEvent(number,text); } }
Note that you can use FireEvent to fire any type of event regardless of the parameters because of the use of the params object array.
Finally, the application needs to expose an endpoint for the publishing service with the events contract:
<services> <service name ="MyPublishService"> <endpoint address = "..." binding = "..." contract="IMyEvents" /> </service> </services>
Figure 11 shows the implementation of PublishService<T>.
Figure 11 PublishService<T>
public abstract class PublishService<T> where T : class { protected static void FireEvent(params object[] args) { StackFrame stackFrame = new StackFrame(1); string methodName = stackFrame.GetMethod().Name; PublishPersistent(methodName,args); PublishTransient(methodName,args); } static void PublishPersistent(string methodName,params object[] args) { T[] subscribers = SubscriptionManager<T>.GetPersistentList(methodName); Publish(subscribers,true,methodName,args); } static void PublishTransient(string methodName,params object[] args) { T[] subscribers = SubscriptionManager<T>.GetTransientList(methodName); Publish(subscribers,false,methodName,args); } static void Publish(T[] subscribers,bool closeSubscribers, string methodName, params object[] args) { WaitCallback fire = delegate(object subscriber) { Invoke(subscriber as T,methodName,args); if(closeSubscribers) { using(subscriber as IDisposable) {} } }; Action<T> queueUp = delegate(T subscriber) { ThreadPool.QueueUserWorkItem(fire,subscriber); }; Array.ForEach(subscribers,queueUp); } static void Invoke(T subscriber,string methodName,object[] args) { Type type = typeof(T); MethodInfo methodInfo = type.GetMethod(methodName); try { methodInfo.Invoke(subscriber,args); } catch {} } }
To simplify firing the event by the publishing service, the FireEvent method accepts the parameters to pass to the subscribers, yet its caller does not provide it with the name of the operation to invoke on the subscribers. For that, FireEvent accesses its stack frame and extracts the name of its calling method. It then uses the helper method PublishPersistent to publish to all persistent subscribers and the PublishTransient helper method to publish to all transient subscribers. Both publishing methods operate almost identically: they access SubscriptionManager<T> to retrieve their respective subscriber list, they then use the Publish method to fire the event. Next, the subscribers are returned in the form of an array of proxies to the subscribers. That array is passed to the Publish method.
Publish could have simply invoked the subscribers at this point. However, I wanted to support concurrent publishing of the events, so that if any subscriber is undisciplined and takes a long time to process the event, this will not preclude other subscribers from receiving the event in a timely manner. Note that having the event operations marked as one-way is no guarantee of asynchronous invocation, and besides, I wanted to support concurrent publishing even when the event operation is not marked as one-way.
The publish method defines two anonymous methods. The first anonymous method calls the Invoke helper method, which results in firing the event to the individual subscriber provided and then closes the proxy if so specified. Because Invoke was never compiled against the specific subscriber type, it uses reflection and late binding for the invocation. Invoke also suppresses any exceptions raised by the invocation, because they are of no interest to the publishing party. The second anonymous method queues up the first anonymous method to be executed by a thread from the thread pool. Finally, Publish invokes the second anonymous method on every subscriber in the provided array.
Note how uniformly PublishService<T> treats the subscribers—it almost does not matter if they are transient or persistent. The only difference is that after publishing to a persistent subscriber you need to close the proxy. This uniformity is achieved by the helper methods GetTransientList and GetPersistentList of SubscriptionManager<T>. Of these two, GetTransientList is the simpler one, as shown here:
public abstract class SubscriptionManager<T> where T : class { internal static T[] GetTransientList(string eventOperation) { lock(typeof(SubscriptionManager<T>)) { List<T> list = m_TransientStore[eventOperation]; return list.ToArray(); } } ... // More members }
GetTransientList looks up in the transient store for all the subscribers to the specified operation and returns them as an array. GetPersistentList faces a bigger challenge. There is no ready-made list of proxies to persistent subscribers—all that is known about them is their address. GetPersistentList therefore needs to instantiate the persistent subscriber proxies as shown in Figure 12.
Figure 12 Creating Persisent Subscribers
public abstract class SubscriptionManager<T> where T : class { internal static T[] GetPersistentList(string eventOperation) { string[] addresses = GetSubscribersToContractEventOperation( typeof(T).ToString(),eventOperation); List<T> subscribers = new List<T>(addresses.Length); foreach(string address in addresses) { Binding binding = GetBindingFromAddress(address); T proxy = ChannelFactory<T>.CreateChannel(binding, new EndpointAddress(address)); subscribers.Add(proxy); } return subscribers.ToArray(); } static string[] GetSubscribersToContractEventOperation( string eventsContract, string eventOperation) { ... // Uses ADO.NET to query SQL Server for subscribers to the // event } static Binding GetBindingFromAddress(string address) { if(address.StartsWith("http:") || address.StartsWith("https:")) { WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message,true); binding.ReliableSession.Enabled = true; binding.TransactionFlow = true; return binding; } // Similar code for the TCP, named pipes and MSMQ bindings Debug.Assert(false,"Unsupported protocol specified"); return null; } ... // More members }
To create a proxy for each subscriber, GetPersistentList needs its address, binding, and contract. The contract is, of course, the type parameter for SubscriptionManager<T>. To obtain the addresses, GetPersistentList calls GetSubscribersToContractEventOperation to query the database and return an array of all addresses of the persistent subscribers who subscribed to the specified event. All GetPersistentList needs now is the binding used by each subscriber. For that, GetPersistentList calls the helper method GetBindingFromAddress, which infers the binding to use from the address schema. GetBindingFromAddress treats all HTTP addresses as WSHttpBinding.
Administering Persistent Subscribers
While you can add and remove persistent subscriptions at run-time using the methods of IPersistentSubscriptionService shown in Figure 9, because of their persistent nature it is likely that managing the subscriptions will be done via some kind of administration tool. To that end, IPersistentSubscriptionService defines additional operations that answer various queries of the subscriber store, as shown in Figure 13.
Figure 13 Administration Query Operations
[DataContract] public struct PersistentSubscription { [DataMember] public string Address; [DataMember] public string EventsContract; [DataMember] public string EventOperation; } [ServiceContract] public interface IPersistentSubscriptionService { //Administration operations [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetAllSubscribers(); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] PersistentSubscription[] GetSubscribersToContract( string eventsContract); [OperationContract] [TransactionFlow(TransactionFlowOption.Allowed)] string[] GetSubscribersToContractEventType( string eventsContract, string eventOperation); ... // More members }
These administration operations utilize a simple data structure called PersistentSubscription that contains the address of the subscriber, the subscribed contract, and the event. My publish-subscribe framework includes a sample persistent subscription administration tool called Persistent Subscription Manager, shown in Figure 14.
Figure 14** Persistent Subscription Manager **(Click the image for a larger view)
The tool uses IPersistentSubscriptionService to add or remove subscriptions. To add a new subscription, you need to provide the tool with the metadata exchange address of the events contract definition. You can use the metadata exchange address of the persistent subscribers themselves or the metadata exchange address of the publish service because they are polymorphic. Enter the metadata exchange base address in the MEX Address textbox and click the Lookup button. The tool will programmatically retrieve the metadata of the event service and populate the Contract and Event comboboxes.
To subscribe, provide the address of the persistent subscriber and click the Subscribe button. Persistent Subscription Manager then adds the subscription by calling to the subscription service (the MySubscriptionService service in the examples so far). The address for the subscription service is maintained in the Persistent Subscription Manager config file.
Queued Publishers and Subscribers
Instead of using synchronous binding for either publishing or subscribing to the events, you can use NetMsmqBinding. Queued publish-subscribe events combine the benefits of a loosely coupled system and the flexibility of disconnected execution. When using queued events, all events on the contract must, of course, be marked as one-way operations. As shown in Figure 15, you can use queuing at either end independently.
Figure 15** Queued Publish-Subscribe **
You can have a queued publisher and connected synchronous subscribers. You could have a connected publisher publishing to queued subscribers or you could have both queued publishers and queued subscribers. Note, however, that you can't have queued transient subscriptions because there is no support with the MSMQ binding for duplex callbacks. As before, you can use the administration tool to manage the subscribers and the administration operations are still connected and synchronous.
To utilize a queued publisher, the publishing service needs to expose a queued endpoint using the MSMQ binding. When firing events at a queued publisher, the publishing service can be offline or the publishing client itself can be disconnected. Note that when publishing two events to a queued publishing service there are no guarantees as to the order of delivery and the processing of these events by the end subscribers. You can only assume the order of publishing when the events contract is configured for a session and only when dealing with a single publishing service.
To deploy a queued subscriber, the persistent subscribing service needs to expose a queued endpoint. Doing so will enable it to be offline even when the publisher is online. When the subscriber is connected again, it will receive all its queued events. In addition, queued subscribers are ideal for when the publishing service itself is disconnected, because no events are lost. When multiple events are fired at a single queued subscriber, there are no guarantees as to the order of delivery of the events. The subscriber can only assume the order of publishing when the events contract is configured for a session. Of course, having both a queued publisher and subscriber allows them to work offline at the same time.
Conclusion
Windows Communication Foundation offers built-in support for powerful and useful programming models, using request-reply, one-way operations or duplex callbacks. Understanding the operations and choosing the right calling mode is one of the first and most important design decisions you will make, and yet applying each mode is surprisingly easy. You can use these operation types as-is in your service-oriented applications or compose your own higher-level abstractions such as the publish-subscribe pattern.
Juval Lowy is a software architect with IDesign providing WCF training and WCF architecture consulting. He is currently working on a comprehensive book on WCF. He is also the Microsoft Regional Director for the Silicon Valley. Contact Juval at www.idesign.net.