C#: A Message Queuing Service Application
Carl Nolan
Microsoft Corporation
Updated March 2002
Summary: This article outlines a Windows service solution designed to process several message queues, focusing on the application of the Microsoft .NET Framework and C#. (39 printed pages)
Download CSharpMessageService.exe.
Contents
Introduction
The .NET Framework Application
Application Structure
Service Classes
Instrumentation
Language Neutrality
Installation
Conclusion
References
Introduction
Recently Microsoft introduced a new platform for building integrated applications, the Microsoft® .NET Framework. This .NET Framework allows one to quickly build and deploy Web services and applications, in any programming language. This language neutral Framework is made possible due to the Microsoft Intermediate Language and JIT compiler.
Along with the .NET Framework has come a new programming language, C# (pronounced C Sharp). C# is a simple, modern, object oriented, and type-safe programming language. Utilizing the .NET Framework and C# (in addition to Microsoft Visual Basic® and Managed C++) one can write highly functional Microsoft Windows® and Web applications and services. This article presents such as solution, focusing on the application of the .NET Framework and C# rather than the programming language. A C# language introduction can be found at https://msdn.microsoft.com/vstudio/techinfo/articles/upgrade/Csharpintro.asp\!href(https://msdn.microsoft.com/vstudio/techinfo/articles/upgrade/Csharpintro.asp).
In a recent MSDN article, a solution was presented for a highly available MSMQ scalable load-balancing solution architecture. This solution involved the development of a Windows service that acted as a smart message router. Previously such a solution was the realm of the C++ programmer. With the advent of the .NET Framework this is no longer the case.
The .NET Framework Application
The solution to be outlined is a Windows service designed to process several message queues; each is processed by multiple threads, both receiving and processing messages. Example processes are given for routing messages using either a round-robin technique or an application specific value (the message AppSpecific property) as an index to a list of destination queues, dispersing the message to several queues, and for calling a component method with the message properties. In the latter case, the requirement of the component is that it implements a given interface, called IProcessMessage. To handle errors the application will send messages that cannot be processed into an error queue.
The messaging application is structured similarly to the previous ATL application, the main differences being the encapsulation of the code to manage the service and the use of the .NET Framework components. As the .NET Framework is object-oriented, it should come as no surprise that to create a Windows service all one has to do is create a class that inherits the ServiceBase class, from the System.ServiceControl assembly.
Application Structure
The main class in the application is ServiceControl, the class that inherits the ServiceBase class. When inheriting from ServiceBase, one must implement OnStart and OnStop methods in addition to the optional OnPause and OnContinue methods. The class is actually constructed within the static method, Main:
using System;
using System.ServiceProcess;
public class ServiceControl: ServiceBase
{
// main entry point that creates the service object
public static void Main()
{
ServiceBase.Run(new ServiceControl());
}
// constructor object that defines the service parameters
public ServiceControl()
{
CanPauseAndContinue = true;
ServiceName = "MSDNMessageService";
AutoLog = false;
}
protected override void OnStart(string[] args) {...}
protected override void OnStop() {...}
protected override void OnPause() {...}
protected override void OnContinue() {...}
}
The ServiceControl class creates a series of WorkerInstance objects, an instance of a WorkerInstance class being created for each message queue requiring processing. The WorkerInstance class in turn creates a series of WorkerThread objects, based on the required number of threads defined to process the queue. The WorkerThread class actually creates a processing thread that will perform the actual service work.
The main purpose of the WorkerInstance and WorkerThread classes is the acknowledgment of the service control Start, Stop, Pause, and Continue commands. Ultimately, as these processes must be non-blocking, the command actions will exact an action on a background processing thread.
The WorkerThread is an abstract class that is inherited by WorkerThreadAppSpecific, WorkerThreadRoundRobin, WorkerThreadDisperse, and WorkerThreadAssembly. Each of these classes process messages in a different manner. The first three process a message by sending it to another queue (the difference being the manner in which the receiving queue path is determined), while the latter uses the message properties to call a component method.
Application error handling within the .NET Framework is based around an ApplicationException class, which is derived from the base Exception class. When one throws or catches errors, the errors must be of a class derived from ApplicationException. The WorkerThreadException class represents such an implementation, extending the base class with the addition of a property that defines whether the service should continue running.
Finally the application contains two structs. These value types define the runtime parameters of a worker process or thread in order to simplify the construction of the WorkerInstance and WorkerThread objects. The use of value-type struct instead of a reference-type class ensures that values rather than references are maintained to these runtime parameters.
IProcessMessage Interface
One of the provided WorkerThread implementations is a class that calls a component method. This class, called WorkerThreadAssembly, uses an interface called IProcessMessage to define the contract between the service and the component.
Unlike the current version of Visual Studio®, C# interfaces can be explicitly defined in any language, removing the need to create and compile IDL files. Hence, using C# the IProcessMessage is defined as below:
[ComVisible(true)]
public interface IProcessMessage
{
ProcessMessageReturn Process
(string messageLabel, string messageBody, int messageAppSpecific);
void Release();
}
The Process method, as in the ATL code, is designated for processing messages. The return code of the Process method is defined by the enumeration type ProcessMessageReturn. The enumeration definitions are as follows: Good continues processing, Bad writes the message to the error queue, and Abort terminates processing.
public enum ProcessMessageReturn
{
ReturnGood,
ReturnBad,
ReturnAbort
}
The Release method provides a mechanism for the service to gracefully destroy the class instance. As the destructor of the instance of the class will get called only during a garbage collection, it is a good practice to ensure that all classes that have expensive resources (such as database connections) have a method that can be called prior to destruction to release these resources.
Namespaces
At this point a brief mention of namespaces is warranted. Namespaces allow applications to be organized into logical elements, for both internal and external representation. All the code within this service is contained within the MSDNMessageService.Service namespace. Although the service code is contained within several files, one does not need to reference the other files because they are contained within the same namespace.
Since the IProcessMessage interface is contained within the MSDNMessageService.Interface namespace, the thread class that uses this Interface has an interface namespace import.
Service Classes
The purpose of the application is to monitor and process message queues. Each queue performs a different process on received messages and the application is implemented as a Windows service.
The ServiceBase Class
As mentioned, the basic structure of a Service is a class that inherits from ServiceBase. The important methods are OnStart, OnStop, OnPause, and OnContinue, with each overridden method corresponding directly to a service control action. The purpose of the OnStart method is to create WorkerInstance objects The WorkerInstance class in turn creates WorkerThread objects from which the threads that perform the service work are created.
The runtime configuration of the service, and hence the properties of the WorkerInstance and WorkerThread objects, are maintained within an XML configuration file, with an associated XSD (XML Schema Definition). An example XML configuration file would be:
<?xml version="1.0" encoding="utf-8" ?>
<ProcessList
xmlns="urn:messageservice-schema"
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance
xsi:schemaLocation="urn:messageservice-schema MessageService.xsd">
<ProcessDefinition ProcessName="Worker1"
ProcessType="RoundRobin" NumberThreads="8" Transactions="false">
<Description>RoundRobin 8 Threads No Transactions</Description>
<InputQueue>.\private$\test_load1</InputQueue>
<ErrorQueue>.\private$\test_error</ErrorQueue>
<OutputList>
<OutputName>.\private$\test_out11</OutputName>
<OutputName>.\private$\test_out12</OutputName>
<OutputName>.\private$\test_out13</OutputName>
</OutputList>
</ProcessDefinition>
<ProcessDefinition ProcessName="Worker2"
ProcessType="AppSpecific" NumberThreads="4" Transactions="true">
<Description>AppSpecific 4 Threads and Transactions</Description>
<InputQueue>.\private$\test_load2_t</InputQueue>
<ErrorQueue>.\private$\test_error_t</ErrorQueue>
<OutputList>
<OutputName>.\private$\test_out21_t</OutputName>
<OutputName>.\private$\test_out22_t</OutputName>
</OutputList>
</ProcessDefinition>
<ProcessDefinition ProcessName="Worker3"
ProcessType="Disperse" NumberThreads="2" Transactions="true">
<Description>Disperse 2 Threads and Transactions</Description>
<InputQueue>.\private$\test_load3_t</InputQueue>
<ErrorQueue>.\private$\test_error_t</ErrorQueue>
<OutputList>
<OutputName>.\private$\test_out31_t</OutputName>
<OutputName>.\private$\test_out32_t</OutputName>
</OutputList>
</ProcessDefinition>
<ProcessDefinition ProcessName="Worker4"
ProcessType="Assembly" NumberThreads="4" Transactions="false">
<Description>Assembly 4 Threads No Transactions</Description>
<InputQueue>.\private$\test_load4</InputQueue>
<ErrorQueue>.\private$\test_error</ErrorQueue>
<AssemblyDefinition>
<FullPath>MessageExample.dll</FullPath>
<ClassName>MSDNMessageService.MessageSample.ExampleClass</ClassName>
</AssemblyDefinition>
</ProcessDefinition>
</ProcessList>
Although not absolutely required, the XSD document referenced in the schemaLocation attribute ensures a valid set of properties for each WorkerInstance and WorkerThread object. Such a schema definition would be:
<?xml version="1.0" encoding="utf-8" ?>
<xsd:schema targetNamespace="urn:messageservice-schema" xmlns="urn:messageservice-schema" xmlns:xsd="http://www.w3.org/2001/XMLSchema" elementFormDefault="qualified">
<xsd:annotation>
<xsd:documentation xml:lang="en">
Message Queueing Service Schema
Created September 2001
</xsd:documentation>
</xsd:annotation>
<xsd:element name="ProcessList" type="PROCESSLIST" />
<xsd:complexType name="PROCESSLIST">
<xsd:sequence>
<xsd:element name="ProcessDefinition"
minOccurs="1" maxOccurs="unbounded">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="Description" type="xsd:string" />
<xsd:element name="InputQueue" type="xsd:string" />
<xsd:element name="ErrorQueue" type="xsd:string" />
<xsd:choice>
<xsd:element name="OutputList" type="OUTPUTLIST" />
<xsd:element name="AssemblyDefinition"
type="ASSEMBLYDEFINITION" />
</xsd:choice>
</xsd:sequence>
<xsd:attribute name="ProcessName" type="xsd:string" />
<xsd:attribute name="ProcessType">
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="RoundRobin" />
<xsd:enumeration value="AppSpecific" />
<xsd:enumeration value="Disperse" />
<xsd:enumeration value="Assembly" />
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="NumberThreads" type="xsd:integer" />
<xsd:attribute name="Transactions" type="xsd:boolean" />
</xsd:complexType>
</xsd:element>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="OUTPUTLIST">
<xsd:sequence>
<xsd:element name="OutputName" type="xsd:string"
minOccurs="1" maxOccurs="unbounded" />
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="ASSEMBLYDEFINITION">
<xsd:sequence>
<xsd:element name="FullPath" type="xsd:string" />
<xsd:element name="ClassName" type="xsd:string" />
</xsd:sequence>
</xsd:complexType>
</xsd:schema>
The name of the XML configuration file is contained within an application configuration file. Application configuration files contain settings that include assembly binding policy, remoting objects, custom channels, and settings the application can read. The name of the application configuration file is the name of the application with a .config extension, being placed in the application executable folder. For example, the message server application is called MessageService.EXE with the configuration file being called MessageService.EXE.config.
Within the application configuration file, a special section called appSettings allows for settings that an application can read. These settings are a set of key value pairs. In this case a key called ConfigurationFile provides the full path to the XML configuration file.
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="ConfigurationFile" value="C:\Path\MessageService.xml" />
<add key="SchemaFile" value="C:\Path\MessageService.xsd" />
</appSettings>
</configuration>
Access to this information is managed through the ConfigurationSettings class from the System.Configuration assembly. The AppSettings method returns the value of the requested key, which in this case is the fully qualified path of the XML configuration file.
string configFile = ConfigurationSettings.AppSettings["ConfigurationFile"];
Once the location of the XML configuration file is determined it can easily be processed as an XML Document. In loading the XML document, a validation against the schema definition is performed:
XmlDocument configXmlDoc = new XmlDocument();
XmlValidatingReader configReader = new XmlValidatingReader(new XmlTextReader(configFile));
configReader.ValidationType = ValidationType.Schema;
configXmlDoc.PreserveWhitespace = false;
configXmlDoc.Load(configReader);
Within the configuration file, the corresponding schema definition is referenced through the schemaLocation attribute. In doing so one does not need to add the schema file to the validating reader Schemas collection:
string schemaFile = ConfigurationSettings.AppSettings["SchemaFile"];
configReader.Schemas.Add(null, schemaFile);
Before one can perform Xpath queries, an appropriate XML namespace reference must be defined. The XML configuration file does not use a null namespace but rather one that is defined for the schema definition. When performing Xpath queries, this namespace must be referenced within the namespace manager:
XmlNamespaceManager configNamespace = new XmlNamespaceManager(configXmlDoc.NameTable);
configNamespace.AddNamespace("default", "urn:messageservice-schema");
To process the XML configuration file one merely needs to process the collection of nodes for each required worker process, and then for each process query the corresponding attribute and element properties. For the attributes this is performed using the GetNamedItem method of the attributes collection. For elements, this is performed using the SelectNodes and SelectSingleNode functions of the XML document; the inputs to these are the required Xpath query and namespace manager.
Because the Service class must maintain a list of created worker objects, the Hashtable collection, which holds a list of name value pairs of type object, is used. In addition to supporting enumerations, the Hashtable allows for querying values by key. In the application, the XML process name is used as the unique key:
foreach (XmlNode processXmlDefinition in configXmlDoc.SelectNodes("descendant::default:ProcessDefinition", configNamespace))
{
// found a process so create a new WorkerFormatter struct
workerDefintion = new WorkerFormatter();
string processName =
processXmlDefinition.Attributes.GetNamedItem("ProcessName").Value;
string processType =
processXmlDefinition.Attributes.GetNamedItem("ProcessType").Value;
// define the process name and type placing them in the worker struct
workerDefintion.ProcessName = processName;
switch (processType)
{
case "RoundRobin":
workerDefintion.ProcessType =
WorkerFormatter.SFProcessType.ProcessRoundRobin;
break;
case "AppSpecific":
workerDefintion.ProcessType =
WorkerFormatter.SFProcessType.ProcessAppSpecific;
break;
case "Disperse":
workerDefintion.ProcessType =
WorkerFormatter.SFProcessType.ProcessDisperse;
break;
case "Assembly":
workerDefintion.ProcessType =
WorkerFormatter.SFProcessType.ProcessAssembly;
break;
default:
throw new ApplicationException("Unknown Processing Type");
}
workerDefintion.NumberThreads = Convert.ToInt32(
processXmlDefinition.Attributes.GetNamedItem("NumberThreads").Value
);
// determine the transaction status of the processing
switch (Convert.ToBoolean(
processXmlDefinition.Attributes.GetNamedItem("Transactions").Value
))
{
case false:
workerDefintion.Transactions =
WorkerFormatter.SFTransactions.NotRequired;
break;
case true:
workerDefintion.Transactions =
WorkerFormatter.SFTransactions.Required;
break;
default:
throw new ApplicationException("Unknown Required Transaction State");
}
// place all remaining elements of the process defintition in the worker formatter
workerDefintion.ProcessDesc = processXmlDefinition.SelectSingleNode
("descendant::default:Description", configNamespace).InnerText;
workerDefintion.InputQueue = processXmlDefinition.SelectSingleNode
("descendant::default:InputQueue", configNamespace).InnerText;
workerDefintion.ErrorQueue = processXmlDefinition.SelectSingleNode
("descendant::default:ErrorQueue", configNamespace).InnerText;
// based on the process type either
// locate the assembly defintion or the output queue names
// in the advent that the process is neither throw an exception
switch (processType)
{
case "Assembly":
workerDefintion.OutputName = new string[2];
workerDefintion.OutputName[0] =
processXmlDefinition.SelectSingleNode(
"descendant::default:AssemblyDefinition/default:FullPath",
configNamespace).InnerText;
workerDefintion.OutputName[1] =
processXmlDefinition.SelectSingleNode(
"descendant::default:AssemblyDefinition/default:ClassName",
configNamespace).InnerText;
break;
case "AppSpecific":
case "Disperse":
case "RoundRobin":
XmlNodeList processXmlOutputs;
processXmlOutputs = processXmlDefinition.SelectNodes
("descendant::default:OutputList/default:OutputName",
configNamespace);
if (processXmlOutputs.Count == 0)
{
throw new ApplicationException
("No Output Parameters " + processName);
}
// allocate the output array based on the number of output names
workerDefintion.OutputName =
new string[processXmlOutputs.Count];
// iterate through output list for the queue/assembly names
int idx = 0;
foreach (XmlNode outputName in processXmlOutputs)
{
workerDefintion.OutputName[idx] = outputName.InnerText;
idx++;
}
break;
default:
throw new ApplicationException("Unknown Processing Type");
}
// add the information into the collection of Worker Formatters
if (workerReferences != null) {
// validate the name is unique
string processName = workerDefintion.ProcessName;
if (workerReferences.ContainsKey(processName))
{
throw new ArgumentException
("Process Name Must be Unique: " + processName);
}
// create a worker object in the worker array
workerReferences.Add(processName,
new WorkerInstance(workerDefintion));
}
}
Both the WorkerInstance and WorkerThread class have corresponding service control methods that are called based on the service control action. As each WorkerInstance object is referenced in the Hashtable, the contents of the Hashtable are enumerated in order to call the appropriate service control method:
foreach (WorkerInstance workerReference in workerReferences.Values)
{
workerReference.StartService();
}
Similarly, the implemented OnPause, OnContinue, and OnStop methods operate by calling the corresponding methods on the WorkerInstance objects.
The WorkerInstance Class
The primary function of the WorkerInstance class is to create and manage WorkerThread objects. The StartService, StopService, PauseService, and ContinueService methods call the corresponding WorkerThread methods. The actual WorkerThread objects are created in the StartService method. Like the Service class, which uses a Hashtable to manage the references to the worker objects, WorkerInstance uses an ArrayList, a simply dynamically sized array, to maintain a list of thread objects.
Within this array the WorkerInstance class will create one of the implemented versions of the WorkerThread class. The WorkerThread class, to be discussed below, is an abstract class that must be inherited. The derived classes define how a message is processed.
threadReferences = new ArrayList();
for (int idx=0; idx<workerDefintion.NumberThreads; idx++)
{
WorkerThreadFormatter threadDefinition = new WorkerThreadFormatter();
threadDefinition.ProcessName = workerDefintion.ProcessName;
threadDefinition.ProcessDesc = workerDefintion.ProcessDesc;
threadDefinition.ThreadNumber = idx;
threadDefinition.InputQueue = workerDefintion.InputQueue;
threadDefinition.ErrorQueue = workerDefintion.ErrorQueue;
threadDefinition.OutputName = workerDefintion.OutputName;
// define the worker type and insert into the work thread struct
WorkerThread workerThread;
switch (workerDefintion.ProcessType)
{
case WorkerFormatter.SFProcessType.ProcessRoundRobin:
workerThread =
new WorkerThreadRoundRobin(this, threadDefinition);
break;
case WorkerFormatter.SFProcessType.ProcessAppSpecific:
workerThread =
new WorkerThreadAppSpecific(this, threadDefinition);
break;
case WorkerFormatter.SFProcessType.ProcessDisperse:
workerThread =
new WorkerThreadDisperse(this, threadDefinition);
break;
case WorkerFormatter.SFProcessType.ProcessAssembly:
workerThread =
new WorkerThreadAssembly(this, threadDefinition);
break;
default:
throw new ApplicationException("Unknown Processing Type");
}
threadReferences.Insert(idx, workerThread);
}
Once all the objects have been created, they can be started by calling the StartService method of each thread object:
foreach(WorkerThread threadReference in threadReferences)
{
threadReference.StartService();
}
The StopService, PauseService, and ContinueService methods all perform similar operations within a foreach loop. The StopService method has the following Garbage Collection (GC) operation:
GC.SuppressFinalize(this);
Within the class destructor the StopService method gets called. This allows the objects to be correctly terminated if the StopService method is not explicitly called. If the StopService method is called, the destructor is not needed. The SuppressFinalize method prevents the objects Finalize method—the actual implementation of the destructor—from being called.
The WorkerThread Abstract Class
The WorkerThread is an abstract class that is inherited by WorkerThreadAppSpecific, WorkerThreadRoundRobin, WorkerThreadDisperse, and WorkerThreadAssembly. Because the majority of the processing of a queue is identical, regardless of how the message gets processed, the WorkerThread class provides this functionality. The class provides abstract methods that must be overridden to manage resources and process messages.
The work of the class is once again implemented in the StartService, StopService, PauseService, and ContinueService methods. In the StartService method, the input and error queues are referenced. Within the .NET Framework, messaging is handled by the System.Messaging namespace:
string inputQueueName = threadDefinition.InputQueue;
string errorQueueName = threadDefinition.ErrorQueue;
if (!MessageQueue.Exists(inputQueueName) ||
!MessageQueue.Exists(errorQueueName))
{
// queue does not exist so through an error
throw new ArgumentException("The Input/Error Queue does not Exist");
}
// try and open the input queue and set the default properties
inputQueue = new MessageQueue(inputQueueName);
inputQueue.MessageReadPropertyFilter.Body = true;
inputQueue.MessageReadPropertyFilter.AppSpecific = true;
// open the error queue
errorQueue = new MessageQueue(errorQueueName);
// set the formatter to be ActiveX if using COM to load messages
inputQueue.Formatter = new ActiveXMessageFormatter();
errorQueue.Formatter = new ActiveXMessageFormatter();
Once the message queues have been opened their transactional states needs to be validated for consistency. A transactional input queue requires a transactional error and output queue and vice versa. If it is determined that a MessageQueueTransaction is required for the Send and Receive functions, then one is created:
if (workerInstance.WorkerInfo.Transactions == WorkerFormatter.SFTransactions.NotRequired)
{
transactionalQueue = false;
}
else
{
transactionalQueue = true;
}
if ((inputQueue.Transactional != transactionalQueue)
|| (errorQueue.Transactional != transactionalQueue))
{
throw new ApplicationException
("Queues do not have Consistent Transactional Status");
}
// if require transactions create a message queue transaction
if (transactionalQueue)
{
queueTransaction = new MessageQueueTransaction();
}
else
{
queueTransaction = null;
}
Once the message queue references and transactions are defined, a thread called ProcessMessages is created for the actual processing function. Within the .NET Framework, threading is easily accomplished using the System.Threading namespace:
messageProcessor = new Thread(new ThreadStart(ProcessMessages));
messageProcessor.Start();
The ProcessMessages function is a processing loop based upon a Boolean value. When set to false the process loop will terminate. Thus the StopService method of the thread object merely sets this Boolean value and then joins the thread with the main thread, in addition to closing the open message queues:
workerRunning = false;
// join the service thread and the processing thread
if (messageProcessor != null)
{
messageProcessor.Join();
}
if (transactionalQueue)
{
queueTransaction.Dispose();
}
inputQueue.Close();
errorQueue.Close();
The PauseService method merely sets a Boolean value that causes the processing thread to sleep for half a second:
if (workerPaused)
Thread.Sleep(500);
Finally, each of the StartService, StopService, PauseService, and ContinueService methods call abstract OnStart, OnStop, OnPause, and OnContinue methods. These abstract methods provide the hooks for implemented classes to capture and release required resources.
The ProcessMessages loop has the following basic structure:
- Receive a Message.
- If a Message has a successful Receive, call the abstract ProcessMessage method.
- If the Receive or ProcessMessage method fails, send the Message into an error queue.
The contents of this processing loop are as follows:
// if a a transaction required create one and receive the message
MessageTransactionStart();
MessageReceive();
// once have a message call the process message abstract method
// any error at this point will force a send to the error queue
if (mInput != null)
{
// if a terminate error is caught the transaction is aborted
try
{
// call the method to process the message
ProcessMessage();
}
// catch error thrown where exception status known
catch (WorkerThreadException ex)
{
ProcessError(ex.Terminate, ex.Message);
}
// catch an unknown exception and call terminate
catch (Exception ex)
{
ProcessError(true, ex.Message);
}
// successfully completed a processing of a message
// this includes writing to the error queue
MessageTransactionComplete(true);
}
The MessageTransactionStart and MessageTransactionComplete methods handle the transactional requirements of the messaging. If the message queues are transactional, a MessageQueueTransaction is initiated and used for both the Receive and Send operations. Depending on the outcome of these operations, the transaction will then be committed or aborted. The ProcessError method sends the erroneous message to the error queue. In addition, it may throw an exception to abnormally terminate the thread. It would perform this action if a terminate error, of type WorkerThreadException, was thrown by the ProcessMessage method. The code for the functions called in the ProcessMessage method is as follows:
// method to process the failed message
private void ProcessError(bool terminateProcessing, string logMessage)
{
// attempt to send the failed error to the error queue
// upon failure to write to the error queue log the error
try
{
MessageSend(errorQueue);
}
catch (Exception ex)
{
LogError
("Message error : " + inputMessage.Label + " : " + ex.Message);
// as one cannot write to error queue terminate the thread
terminateProcessing = true;
}
// if required terminate the thread and associated worker
if (terminateProcessing)
{
// abort transaction as cannot place message into the error queue
MessageTransactionComplete(false);
LogError("Thread Terminated Abnormally : " + logMessage);
// an error here should also terminate the processing thread
workerInstance.StopOnError();
throw new ApplicationException("Terminate Thread");
}
}
// message queue send method that will honour the transaction in process
// protected to allow derived classes to send to remote queues
protected void MessageSend(MessageQueue messageQueue)
{
if (queueTransaction == null)
{
messageQueue.Send(inputMessage);
}
else
{
messageQueue.Send(inputMessage, queueTransaction);
}
}
// message queue receive method that will honour transaction in process
// protected to allow derived classes to read from other queues
protected void MessageReceive()
{
try
{
if (queueTransaction == null)
{
inputMessage = inputQueue.Receive(queueTimeout);
}
else
{
inputMessage =
inputQueue.Receive(queueTimeout, queueTransaction);
}
}
catch (MessageQueueException ex)
{
// set the message to null as not to be processed
inputMessage = null;
// as message has not been read terminate the transaction
MessageTransactionComplete(false);
// look at the error code and see if there was a timeout
// if not a timeout throw an error and log the error number
if (ex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)
{
LogError("Error : " + ex.Message);
// an error here should also terminate processing thread
workerInstance.StopOnError();
throw ex;
}
}
}
// start any required message queue transaction
private void MessageTransactionStart()
{
try
{
if (transactionalQueue)
{
queueTransaction.Begin();
}
}
catch(Exception ex)
{
LogError("Cannot Create Message Transaction " +
perfCounterThreadName + ": " + ex.Message);
// an error here should also terminate processing thread
workerInstance.StopOnError();
throw ex;
}
}
// complete the message queue transaction
// based on the existence of transaction and it success requirement
private void MessageTransactionComplete(bool transactionSuccess)
{
if (transactionalQueue)
{
try
{
// committing or aborting a transactions must be successful
if (transactionSuccess)
{
queueTransaction.Commit();
}
else
{
queueTransaction.Abort();
}
}
catch (Exception ex)
{
LogError("Cannot Complete Message Transaction " +
perfCounterThreadName + ": " + ex.Message);
// an error here should also terminate processing thread
workerInstance.StopOnError();
throw ex;
}
}
}
To better handle transactional consistency of the message Receive and Send, the abstract worker class provides MessageReceive and MessageSend functions. The purpose of these functions is to process the message, thus honoring transactional requirements.
The WorkerThread Derived Classes
Any class that inherits from WorkerThread must override the OnStart, OnStop, OnPause, OnContinue, and ProcessMessage methods. The purpose of the OnStart and OnStop methods are to acquire and release processing resources. The OnPause and OnContinue are provided to allow the temporarily release and reacquisition of these resources. The ProcessMessage method should process a single message, throwing a WorkerThreadException exception in the advent of a failure.
As the WorkerThread constructor defines runtime parameters, the derived classes must call the base class constructor:
public WorkerThreadRoundRobin(WorkerInstance workerInstance, WorkerThreadFormatter workerThreadFormatter)
: base (workerInstance, workerThreadFormatter) {}
Derived classes are provided for three types of processing: message forwarding to a single queue, message forwarding to a queue list, and calling a component method for each message. The two implementations that forward messages to a single queue use a round-robin technique or an application offset held in the message AppSpecific property, as the determining factor for which queue to use. The configuration file in this scenario should contain a list of queue paths. The implemented OnStart and OnStop methods should open and close a reference to these queues:
queueCount = threadDefinition.OutputName.Length;
outputQueues = new MessageQueue[queueCount];
// open each output queue
for (int idx=0; idx<queueCount; idx++)
{
outputQueues[idx] = new
MessageQueue(threadDefinition.OutputName[idx]);
outputQueues[idx].Formatter = new ActiveXMessageFormatter();
// validate the transactional property of the queue
if (outputQueues[idx].Transactional != transactionalQueue)
{
throw new ApplicationException
("Queues do not have consistent Transactional status");
}
}
In these scenarios, processing the message is simple: send the message to the required output queue using the abstract base class MessageSend function. In a round-robin situation, this process would be:
try
{
// attempt to send the message to the output queue
MessageSend(outputQueues[queueNext]);
}
catch (Exception ex)
{
// if an error force an error and terminate thread and worker
throw new WorkerThreadException(ex.Message, true);
}
// calculate the next queue number
queueNext++;
queueNext %= queueCount;
The implementation that forwards messages to a queue list is very similar to the previous implementation, except that the message is forwarded to all output queues instead of a deterministic single queue:
try
{
// attempt to send the message to all the output queue
for (int idx=0; idx<queueCount; idx++)
{
MessageSend(outputQueues[idx]);
}
}
catch (Exception ex)
{
// if an error force an error and terminate thread and worker
throw new WorkerThreadException(ex.Message, true);
}
The latter implementation, calling a component method with the message parameters is a little more interesting. Using the IProcessMessage interface, the ProcessMessage method calls into a .NET component. The OnStart and OnStop methods obtain and release a reference to this component.
The configuration file in this scenario should contain two items: the full class name and the location of the file in which the class resides. The Process method is called on the component as defined in the IProcessMessage interface.
To obtain the object reference the Activator.CreateInstance method is used. The function requires an Assembly Type, in this case derived from the assembly file path and class name. Once an object reference is obtained it is cast into the appropriate interface:
private IProcessMessage messageProcessor;
private string messageProcessorLocation, messageProcessorName;
// obtain the assembly path and type name
messageProcessorLocation = threadDefinition.OutputName[0];
messageProcessorName = threadDefinition.OutputName[1];
// obtain a reference to the required object
Type typSample = Assembly.LoadFrom
(messageProcessorLocation).GetType(messageProcessorName);
object messageProcessorObject = Activator.CreateInstance(typSample);
// cast to the required interface on the object
messageProcessor = (IProcessMessage)messageProcessorObject;
When an object reference is obtained, the ProcessMessage method calls the Process method on the IProcessMessage interface:
ProcessMessageReturn processReturnValue;
// attempt to call the required Process method
try
{
// define the parameters for the method call
string messageLabel = inputMessage.Label;
string messageBody = (string)inputMessage.Body;
int messageAppSpecific = inputMessage.AppSpecific;
// call the method and catch the return code
processReturnValue = messageProcessor.Process
(messageLabel, messageBody, messageAppSpecific);
}
catch (InvalidCastException ex)
{
// if an error casting message details force a non critical error
throw new WorkerThreadException(ex.Message, false);
}
catch (Exception ex)
{
// if an error calling assembly termiate the thread processing
throw new WorkerThreadException(ex.Message, true);
}
// if no error review the return status of the object call
switch (processReturnValue)
{
case ProcessMessageReturn.ReturnBad:
throw new WorkerThreadException
("Unable to process message: Message marked bad", false);
case ProcessMessageReturn.ReturnAbort:
throw new WorkerThreadException
("Unable to process message: Process terminating", true);
default:
break;
}
The example component provided writes the message body into a database table. In this case, the desired result of a severe database error may be abort processing, marking the message as erroneous in the advent of another failure.
Since the instance of the class created for this example may acquire and hold expensive database resources, the OnPause and OnContinue methods release and reacquire the object reference.
Instrumentation
As in all good applications, instrumentation is provided to monitor the status of the application. The .NET Framework has greatly simplified inclusion of event logging, performance counters, and windows management instrumentation (WMI) into applications. The messaging application uses event logging and performance counters, both from the System.Diagnostics assembly.
Within the ServiceBase class automatic event logging can be enabled. In addition, the ServiceBase EventLog member supports writing to the Application event log:
EventLog.WriteEntry(logMessage, EventLogEntryType.Information);
For an application to write event logs other than the application log, it can easily create and obtain a reference an EventLog source, as is done in the WorkerInstance class, and then use the WriteEntry method for recording log entries:
private EventLog eventLog;
private bool eventLogCreated = false;
private string eventLogName = "Application";
private string eventLogSource = ServiceControl.ServiceControlName;
// see if the source exists creating it if not
if (!EventLog.SourceExists(eventLogSource))
{
eventLogCreated = true;
EventLog.CreateEventSource(eventLogSource, eventLogName);
}
// create the log object and reference the now defined source
eventLog = new EventLog();
eventLog.Source = eventLogSource;
// write an entry to inform successful creation
eventLog.WriteEntry("Successfully Created", EventLogEntryType.Information);
Performance counters have been greatly simplified by the .NET Framework. This messaging application provides counters that track the total number and number per second of processed messages for each processing thread, the worker from which the thread derived, and the service as a whole. To provide this functionality, one has to define the performance counter categories and then increment corresponding counter instances.
Within the Service OnStart method performance counter categories are defined. These categories represent the two counters; total messages and messages processed a second:
string perfCounterCatName = "MSDN Message Service";
string perfCounterSecName = "Messages/Total";
string perfCounterTotName = "Messages/Second";
CounterCreationDataCollection messagePerfCounters =
new CounterCreationDataCollection();
messagePerfCounters.Add(new CounterCreationData(perfCounterTotName,
"Total Messages Processed", PerformanceCounterType.NumberOfItems64));
messagePerfCounters.Add(new CounterCreationData(perfCounterSecName,
"Messages a Second", PerformanceCounterType.RateOfCountsPerSecond32));
PerformanceCounterCategory.Create(perfCounterCatName,
"MSDN Message Service Sample Counters", messagePerfCounters);
Once the performance counter categories are defined, a PerformanceCounter object is created to provide access to counter instance functionality. The PerformanceCounter object requires the category and counter name and an optional instance name. For the worker process using the process name from the XML configuration file, the code is as follows:
string perfCounterWorkerName = threadDefinition.ProcessName;
perfCounterTotWorker = new PerformanceCounter(
perfCounterCatName, perfCounterTotName, perfCounterWorkerName, false);
perfCounterSecWorker = new PerformanceCounter(
perfCounterCatName, perfCounterSecName, perfCounterWorkerName, false);
perfCounterTotWorker.RawValue = 0;
perfCounterSecWorker.RawValue = 0;
Incrementing the counters is then simply a matter of calling the appropriate method:
perfCounterTotWorker.IncrementBy(1);
perfCounterSecWorker.IncrementBy(1);
On a final note, when the service is stopped, the installed performance counter category should be deleted from the system:
PerformanceCounterCategory.Delete(perfCounterCatName);
Deleting the performance counter category when the service is stopped will mean the service performance counters will only exist while the service is running. The reasoning behind this is that the service start operation is running when the XML configuration file is processed and thus when the performance counter instances are known. Stopping and starting the service forces the XML configuration file to be reprocessed.
Language Neutrality
As stated earlier, by utilizing the .NET Framework one can write highly functional windows and Web applications and services using C#, Visual Basic .NET, or Managed C++ (MC++). Within this messaging queuing service application, all the functionality for implementing a Windows service, managing threads, processing message queues, performance monitoring, event logging, exception handling, and processing XML all comes from the .NET Framework.
To prove this statement, included with the downloadable code is the Visual Basic .NET version of the messaging queuing service application. As an example, the full implementation of the derived class that processes messages by calling a component through the IProcessMessage interface is as follows:
' implementation that calls into an assembly
' that implements the IProcessMessage interface
Friend Class WorkerThreadAssembly
Inherits WorkerThread
' refernece to the required object interface
Private iwmSample As IProcessMessage
' reference to the assembly information
Private sFilePath, sTypeName As String
' calls the base class constructor
Public Sub New (ByVal cParent As WorkerInstance, _
ByVal wfThread As WorkerThreadFormatter)
MyBase.new(cParent, wfThread)
End Sub
' when starting obtain reference to assembly and construct object
Protected Overrides Sub OnStart()
' ensure have the assembly path and type name
sFilePath = wfThread.OutputName(0)
sTypeName = wfThread.OutputName(1)
' obtain a reference to the required object
InitObject()
End Sub
' when stopping release the resources
Protected Overrides Sub OnStop()
ReleaseObject()
End Sub
' override the pause and continue methods
Protected Overrides Sub OnPause()
' when pausing release the object as may be paused for a while
ReleaseObject()
End Sub
Protected Overrides Sub OnContinue()
' after a pause re-initialize the object
InitObject()
End Sub
' method to initialize the object
Private Sub InitObject()
' obtain a reference to the required object
Dim asmSample As [Assembly] = [Assembly].LoadFrom(sFilePath)
Dim typSample As [Type] = asmSample.GetType(sTypeName)
Dim objSample As Object = Activator.CreateInstance(typSample)
' cast to the required interface on the object
iwmSample = CType(objSample, IProcessMessage)
End Sub
' method to release the obejct
Private Sub ReleaseObject()
iwmSample.Release()
iwmSample = Nothing
End Sub
' method to perform the processing of the message
Protected Overrides Sub ProcessMessage()
' return value from the process call
Dim wbrSample As ProcessMessageReturn
' attempt to call the required Process method
Try
' define the parameters for the method call
Dim sLabel As String = mInput.Label
Dim sBody As String = mInput.Body
Dim iAppSpecific As Integer = mInput.AppSpecific
' call the method and catch the return code
wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific)
Catch ex As InvalidCastException
' error casting message details force a non critical error
Throw New WorkerThreadException(ex.Message, False)
Catch ex As Exception
' error calling the assembly termiate the thread processing
Throw New WorkerThreadException(ex.Message, True)
End Try
' if no error review the return status of the object call
Select Case wbrSample
Case ProcessMessageReturn.ReturnBad
Throw New WorkerThreadException _
("Unable to process message: Marked bad", False)
Case ProcessMessageReturn.ReturnAbort
Throw New WorkerThreadException _
("Unable to process message: Terminating", True)
End Select
End Sub
End Class
To further extend the concept of language neutrality, a sample MC++ component that implements the IProcessMessage interface is also provided with the downloadable code. Thus, from the Visual Basic .NET implementation of the Windows service, an MC++ component that implements a C# interface can be consumed.
The sample MC++ component implementation is broken down into a header and source file. The class declaration is defined in the header file:
#pragma once
#using <System.dll>
#using <System.Data.dll>
#using <MessageInterface.dll>
using namespace System;
using namespace System::Diagnostics;
using namespace System::Data;
using namespace System::Data::SqlClient;
using namespace System::Configuration;
using namespace MSDNMessageService::Interface;
namespace MSDNMessageService
{
namespace MessageSample
{
public __gc class ExampleClass : public IProcessMessage
{
private:
// the event logging class
EventLog *eventLog;
String *eventLogSource;
String *eventLogName;
Boolean eventLogCreated;
public:
ExampleClass()
{
// define the definition for the log class
eventLogSource = S"MSDNMessageService.MessageExample";
eventLogName = S"Application";
// create the event log class
CreateLogClass();
};
~ExampleClass()
{
// delete the event log class
DeleteLogClass();
};
ProcessMessageReturn Process(String *messageLabel, String *messageBody, int messageAppSpecific);
void Release();
private:
void CreateLogClass();
void DeleteLogClass();
void LogInformation(String *logMessage);
void LogError(String *logMessage);
};
};
}
The actual implementation of the component is contained with the source file:
#include "stdafx.h"
#include "MessageExampleMC.h"
namespace MSDNMessageService
{
namespace MessageSample
{
// process the web message by posting it into the database
ProcessMessageReturn ExampleClass::Process(String *messageLabel,
String *messageBody, int messageAppSpecific)
{
// define the return variable
ProcessMessageReturn returnCode =
ProcessMessageReturn::ReturnGood;
// try the database operation and if fails throw an error
try
{
// connect to the database
String *sqlConnection = ConfigurationSettings::
AppSettings->get_Item("SqlConnection");
SqlConnection *conNW = new SqlConnection(sqlConnection);
// setup the command object
SqlCommand *comNW = new SqlCommand
("usp_insert_messageorder", conNW);
comNW->CommandType = CommandType::StoredProcedure;
// define the message label parameter
SqlParameter *labelParam = new SqlParameter
("@MessageLabel", SqlDbType::VarChar, 500);
labelParam->Direction = ParameterDirection::Input;
labelParam->Value = messageLabel;
comNW->Parameters->Add(labelParam);
// define the message body paramater
SqlParameter *bodyParam = new SqlParameter
("@MessageBody", SqlDbType::VarChar, 5000);
bodyParam->Direction = ParameterDirection::Input;
bodyParam->Value = messageBody;
comNW->Parameters->Add(bodyParam);
// define the message App Specific paramater
SqlParameter *appSpecificParam = new SqlParameter
("@AppSpecific", SqlDbType::Int);
appSpecificParam->Direction = ParameterDirection::Input;
appSpecificParam->Value = __box(messageAppSpecific);
comNW->Parameters->Add(appSpecificParam);
// execute the stored procedure
conNW->Open();
comNW->ExecuteNonQuery();
// tidyup the database connection
conNW->Close();
}
catch (SqlException &ex)
{
if (ex.Number > 16) // severity test
{
LogError(ex.Message);
returnCode = ProcessMessageReturn::ReturnAbort;
}
else
{
LogInformation(ex.Message);
returnCode = ProcessMessageReturn::ReturnBad;
}
}
catch (Exception &ex)
{
LogError(ex.Message);
returnCode = ProcessMessageReturn::ReturnAbort;
}
// return the determined status code
return returnCode;
};
// release all resources
void ExampleClass::Release()
{
return;
};
// create a log class for writing to the event log
void ExampleClass::CreateLogClass()
{
try
{
// create event log source if it does not exist
if (!EventLog::SourceExists(eventLogSource))
{
EventLog::CreateEventSource(eventLogSource, eventLogName);
eventLogCreated = true;
}
else
{
eventLogCreated = false;
}
// open the event log
eventLog = new EventLog();
eventLog->Source = eventLogSource;
}
catch (Exception&)
{
// if the log cannot be initialized null event object
eventLog = 0;
}
};
// delete the log class
void ExampleClass::DeleteLogClass()
{
try
{
// close the event log
eventLog->Close();
// if event log source created then delete
if (eventLogCreated)
{
EventLog::DeleteEventSource(eventLogSource);
}
}
catch (Exception&)
{
// nothing to do
}
};
// write an informational message to the event log
void ExampleClass::LogInformation(String *logMessage)
{
try
{
if (eventLog != 0)
{
eventLog->WriteEntry(logMessage,
EventLogEntryType::Information);
}
}
catch (Exception&)
{
// nothing to do
}
};
// write an error message to the event log
void ExampleClass::LogError(String *logMessage)
{
try
{
if (eventLog != 0)
{
eventLog->WriteEntry(logMessage,
EventLogEntryType::Error);
}
}
catch (Exception&)
{
// nothing to do
}
};
};
};
When comparing the Managed C++ code to that of the C# sample there are very few differences, even though the MC++ is broken down into a header and source file. Besides the use of pointers and the domain resolver, the main difference is the use of the __box keyword. This boxing operation creates a managed object derived from System.ValueType, a __value class object.
This is required in assigning value types to the SqlParameter value property. The INT value type requires boxing, unlike the String type that is a already a managed object, derived from System.Object.
Installation
Before concluding, a brief mention is warranted about installation and a utility called installutil.exe. As this application is a Windows service, it must be installed using this utility. To facilitate this, a class is required that inherits the Installer class from the System.Configuration.Install assembly:
[RunInstaller(true)]
public class ServiceRegister: Installer
{
private ServiceInstaller serviceInstaller;
private ServiceProcessInstaller processInstaller;
public ServiceRegister()
{
// define and create the service installer
serviceInstaller = new ServiceInstaller();
serviceInstaller.StartType = ServiceStartMode.Manual;
serviceInstaller.ServiceName = ServiceControl.ServiceControlName;
serviceInstaller.DisplayName = ServiceControl.ServiceControlDesc;
Installers.Add(serviceInstaller);
// define and create the process installer
processInstaller = new ServiceProcessInstaller();
#if RUNUNDERSYSTEM
processInstaller.Account = ServiceAccount.LocalSystem;
#else
// prompt for user and password on install
processInstaller.Account = ServiceAccount.User;
processInstaller.Username = null;
processInstaller.Password = null;
#endif
Installers.Add(processInstaller);
}
}
As this sample class demonstrates, for a Windows service, an installer is required for the service and another for the service process in order to define the account under which the service will run. In this instance, the service account is the LocalSystem. If user account is chosen, then the installer will prompt for a user name and password during the installation, unless both are supplied. Other installers allow for registration of resources such as event logs and performance counters.
Conclusion
As one can see from this sample .NET Framework application, what was previously only in the realm of Visual C++ programmers is now possible in a simple object-oriented program. The new .NET Framework has enabled one to develop highly functionally scalable windows applications and service from any programming language. Although this article focuses on C#, the same functionality was easily achieved using Visual Basic .NET or Managed C++.
Not only has the new Framework simplified and extended the programming possibilities, often forgotten application instrumentation such as performance monitor counters and event log notifications are simple to incorporate into applications. This also applies to Windows Management Instrumentation, although that is not used in this application.
References
- Architectural Design: A Scalable, Highly Available Business Object Architecture
- MSMQ: A Scalable, Highly Available Load-Balancing Solution
- CSharp Introduction and Overview
- CSharp Language Reference
- MSDN Online .NET Information
Carl Nolan works in Northern California at the Microsoft Technology Center Silicon Valley. This center focuses on the development of .NET solutions using the Windows .NET platform. He can be reached at carlnol@microsoft.com.