Processing multiple queues in FIFO order with WCF
In my previous entry on Processing FIFO MSMQ Messages using WCF I said that I would cover processing multiple queues and processing each queue in FIFO order.
If you remember, our requirement was to processe messages we receive from a legacy system through MSMQ in FIFO order. Well we found that there were a couple of issues. Neither of them was a surprise - FIFO processing is slow and there is no scale out.
So, we did a little more investigation and we found that we could segment our processing by message types (each being sent through their own queue) and then process each in FIFO order. The FIFO processing is the same as in the past blog post. This blog entry is focused on dynamically adding and creating hosts for each of the queues.
We need a way to be able to add queue bindings in the .config file and have the application recognize the new queue without the need to modify code or have the application have any knowledge of how many queues would be monitored.
So, with this, I would set up the .config file looks like this:
<services>
<service name="EventManagement.MSMQEventServiceTypeX" >
<endpoint address="msmq.formatname:DIRECT=OS:.\private$\EventManagementQueueX"
binding="msmqIntegrationBinding"
bindingConfiguration="EventProcessorBinding"
behaviorConfiguration="BatchingBehavior"
contract="EventManagement.IMSMQEventInterface">
</endpoint>
</service>
<service name="EventManagement.MSMQEventServiceTypeY" >
<endpoint address="msmq.formatname:DIRECT=OS:.\private$\EventManagementQueueY"
binding="msmqIntegrationBinding"
bindingConfiguration="EventProcessorBinding"
behaviorConfiguration="BatchingBehavior"
contract="EventManagement.IMSMQEventInterface">
</endpoint>
</service>
</services>
The idea is to be able to add a new service entry for each new message type with a new endpoint address and have the code spin up a new service host. The code to make this happens looks like this:
public class EventProcessor
{
private List<ServiceHost> svcHosts = new List<ServiceHost>();
private MSMQEventService mqevtsvc;
public void ProcessMessages()
{
mqevtsvc = new MSMQEventService();
ServicesSection SvcSec = (ServicesSection)ConfigurationManager.GetSection("system.serviceModel/services");
string MSMQSvcName = mqevtsvc.GetType().FullName;
string baseName = ConfigurationManager.AppSettings["EventQueueName"];
Uri baseAddress = new Uri(baseName);
//Roll through the service entries and create service hosts to handle them
foreach (ServiceElement svc in SvcSec.Services)
{
if (svc.Name.StartsWith(MSMQSvcName))
{
object svcObj = null;
string Suffix = svc.Name.Substring(MSMQSvcName.Length);
if (Suffix == string.Empty)
{
svcObj = mqevtsvc;
}
else
{
svcObj = CreateMSMQObj(Suffix);
}
if (svcObj == null)
{
ErrorHandler.Instance.OnError("Error creating service type " + mqevtsvc.GetType().Name + Suffix, EventLogEntryType.Warning, ErrorHandler.WarningNumber.ServiceStarting);
}
else
{
ServiceHost newSvc = new ServiceHost(svcObj, baseAddress);
newSvc.Faulted += new EventHandler(serviceHost_Faulted);
svcHosts.Add(newSvc);
}
}
else
{
ErrorHandler.Instance.OnError(string.Format("Service name {0} must begin with {1}", svc.Name, MSMQSvcName), EventLogEntryType.Warning, ErrorHandler.WarningNumber.ServiceStarting);
}
}
// Open the ServiceHostBase to create listeners and start listening for messages.
foreach (ServiceHost svcHost in svcHosts)
{
svcHost.Open();
}
}
private object CreateMSMQObj(string SuffixName)
{
CSharpCodeProvider loCompiler = new CSharpCodeProvider();
CompilerParameters loParameters = new CompilerParameters();
loParameters.ReferencedAssemblies.Add("System.dll");
loParameters.ReferencedAssemblies.Add(this.GetType().Assembly.Location);
loParameters.ReferencedAssemblies.Add(typeof(ServiceHost).Assembly.Location);
loParameters.GenerateInMemory = true;
string Code = @"
using System;
using System.ServiceModel;
namespace EventManagement
{
public class MSMQEventService{0} : MSMQEventService, IMSMQEventInterface, IDisposable
{
}
}
";
Code = Code.Replace("{0}", SuffixName);
CompilerResults loCompiled = loCompiler.CompileAssemblyFromSource(loParameters, Code);
if (loCompiled.Errors.HasErrors)
{
//……add code here to process errors
}
Assembly loAssembly = loCompiled.CompiledAssembly;
object loObject = loAssembly.CreateInstance("EventManagement.MSMQEventService" + SuffixName);
return loObject;
}
}
There are two interesting parts to this code. The first is at the top where the .config file is read. The second is in the CreateMSMQObj private method.
When the .config file is read the service name attribute is read and that is used as part of the class name that is used with the ServiceHost.
During the CreateMSMQObj method, we are dynamically creating a class for each of the service elements from the .config file.
At this point we have a system that can dynamically read from multiple queues, defined at runtime, and process the messages in each queue in FIFO order.
This code is not the full source code and some of the processing code has been taken out for brevity. Hopefully, this will provide a glimpse and a provide a head start to process multiple queues in FIFO order.
Comments
- Anonymous
February 20, 2008
PingBack from http://www.biosensorab.org/2008/02/20/processing-multiple-queues-in-fifo-order-with-wcf/