Поделиться через


WCF processes messages from MSMQ out of order using netMsmqBinding

WCF threads are assigned from the CLR ThreadPool to incoming messages based on the value of MaxConcurrentCalls.

In another word, more than one I/O threads from the CLR Threadpool are likely used to handle the incoming messages if MaxConcurrentCalls value is greater than 1.

In a 4.x framework, the default value of MaxConcurrentCalls is set to 16* ProcessorCount.

You may have a WCF service that is reading multiple messages from MSMQ and its MaxConcurrentCalls set to a value higher than 1.

Checking the service method output, you may noticed that the messages are not always processed in order although the behavior on the service implementation is set as follows:

Set InstanceContextMode = Single and ConcurrencyMode = Single.

In order to prevent this from happening, we need to force WCF to use only one thread from the CLR Threadpool to handle all the incoming messages at the time.

Two workarounds allow us to achieve this goal:

1. Set <serviceThrottling maxConcurrentCalls="1" /> in the config file

  1. This will force wcf to assign only one thread to handle all the incoming MSMQ messages.

  2. Hence, this will assure that all the incoming messages are processed sequentially.

  3.  

  4.  

  5.   1  <system.serviceModel>
     2     <bindings>
     3       <netMsmqBinding>
     4         <binding name="msmq" durable="true" exactlyOnce="true" receiveErrorHandling="Drop">
     5           <readerQuotas maxDepth="2147483647" maxStringContentLength="2147483646" maxArrayLength="2147483647" maxBytesPerRead="2147483647" maxNameTableCharCount="2147483647" />
     6           <security mode="None" />
     7         </binding>
     8       </netMsmqBinding>
     9     </bindings>
    10     <services>
    11       <service name="WcfTestContract.ServiceTest">
    12         <endpoint address="net.msmq://localhost/private/WCFTestQueue" binding="netMsmqBinding" contract="WcfTestContract.IServiceTest" bindingConfiguration="msmq"/>
    13         <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" />
    14         <host>
    15           <baseAddresses>
    16             <add baseAddress="https://localhost:8733/WcfTestContract/" />
    17           </baseAddresses>
    18         </host>
    19       </service>
    20     </services>
    21     <behaviors>
    22       <serviceBehaviors>
    23         <behavior>
    24           <serviceMetadata httpGetEnabled="True" httpsGetEnabled="True"/>
    25           <serviceDebug includeExceptionDetailInFaults="False" />
    26           <serviceThrottling maxConcurrentCalls="1" />
    27         </behavior>
    28       </serviceBehaviors>
    29     </behaviors>
    30   </system.serviceModel>
    

 

2. Set MaxBatchSize of TransactedBatchingBehavior to 1 and set ReleaseServiceInstanceOnTransactionComplete on the service implementation to false

The below settings will force WCF to use only one thread to handle all the incoming MSMQ messages.

  • Set MaxBatchSize of TransactedBatchingBehavior to 1
    •   1 static void Main(string[] args)
      2         {
      3 
      4             using (System.ServiceModel.ServiceHost serviceHost = new System.ServiceModel.ServiceHost(typeof(WcfTestContract.ServiceTest)))
      5             {
      6 
      7                 TransactedBatchingBehavior batchBehavior = new TransactedBatchingBehavior(1);
      8                 //batchBehavior.MaxBatchSize = 10;
      9                 Type type = batchBehavior.GetType();
      10 
      11                 foreach (ServiceEndpoint current in serviceHost.Description.Endpoints)
      12                 {
      13                     if (!current.Behaviors.Contains(type) && current.Binding.Name.Contains("NetMsmqBinding"))
      14                     {
      15                         current.Behaviors.Add(batchBehavior);
      16                     }
      17                 }
      18 
      19                 serviceHost.Open();
      20                 Console.WriteLine("Press any key to exit...");
      21                 Console.ReadKey();
      22             }
      23   
      24         }
      
    •  

    •  

  • set ReleaseServiceInstanceOnTransactionComplete on the service implementation to false

    •  

    •   1 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, 
       2         ConcurrencyMode = ConcurrencyMode.Single,ReleaseServiceInstanceOnTransactionComplete = false)]
       3     public class ServiceTest : IServiceTest
       4     {
       5         static int lastSequence = 0;
       6 
       7         [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
       8         public void Test(int sequenceId)
       9         {
      10             Thread thread = Thread.CurrentThread;
      11             int tid = (Int32)thread.ManagedThreadId;
      12 
      13             if (sequenceId != lastSequence + 1)
      14             {
      15                 Console.WriteLine("Out of order! Expected {0} was {1} on thread {2}", lastSequence + 1, sequenceId, tid);
      16 
      17             }
      18             else
      19             {
      20                 Console.WriteLine("In order! Expected {0} was {1} on thread {2}", lastSequence + 1, sequenceId, tid);
      21 
      22             }
      23 
      24 
      25             lock (new object())
      26                   lastSequence = sequenceId;
      27            
      28         }
      29 
      30         
      31     }
      32 
      33 
      

 

 

Client can add messages to MSMQ as follows:

 

  1 class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             using (TransactionScope scope = new TransactionScope())
 6             {
 7                 ServiceTestClient proxy = new ServiceTestClient("NetMsmqBinding_IServiceTest");
 8                 try
 9                 {
10                     
11                     for (int i = 1; i < 500; i++)
12                     {
13                         proxy.Test(i);
14                     }
15 
16                     proxy.Close();
17                     scope.Complete();
18                     
19                 }
20                 catch (TransactionAbortedException ex)
21                 {
22                     Console.WriteLine("\nTransactionAbortedException Error: \n{0}", ex.Message);
23                 }
24                 catch (Exception ex)
25                 {
26                     Console.WriteLine("\nError: \n{0}", ex.Message);
27                     proxy.Abort();
28                 }
29                 
30             }
31         
32             Console.WriteLine("\n\nPress any key to exit");
33             Console.ReadKey();
34         }
35     }