Multi-Threaded Web Service Calls - A C# Code Sample
Sometimes it becomes necessary to call an external web service in a multi threaded manner to speed up processing of records that we have in a buffer so that the overall time spent in waiting for the calls to return is reduced. The high-level steps to perform are:
Though we can have a long philosophical discussion as to which design to use, my intention here is to demonstrate and give some code sample as to how to achieve this. Some of the assumption for this are:
- All the records which contain the info to be submitted to an external web service is in a common buffer
- Each thread will process multiple records at a time before getting additional data from the buffer for the next iteration
- The code is not optimized for performance and can be worked upon
Ok, now with the disclaimers out of the way, lets see the basic steps involved in cobbling up a multi thread routine to call a web service. Broadly, we need to perform the following steps:
- GetData(): Get the data from the buffer for each of the threads that will call the web service
- Process(): Call the web service using the web proxy
- SetData(): Merge the output obtained by each of the threads to a common output buffer
The three main things that need to be kept in mind are:
- Using MUTEX to make sure that no two threads get the same set of data for the input.
- Using MUTEX to make sure that no thread overwrites the output of the other.
- Making sure that no thread gets zombied and that the main calling thread waits for the others to terminate.
The complete code
/// <summary>
/// Multi Threaded web processor
/// </summary>
public sealed class WebProcessingService
{
// setup the mutex for the input buffer
private readonly static Mutex getDataSyncMutex = new Mutex(false);
// setup the mutex for the output
private readonly static Mutex setDataSyncMutex = new Mutex(false);
// this indexer is used to make sure that the input array bounds are not violated
// when the threads are getting fed the input
private static int indexer = 0;
string[] inputBuffer = null;
//Assuming that the records to be processed result in a unique output
Dictionary<string, string> outputBuffer = null;
// else we can use the below, but we need to have mechanism to have a unique key for
// each of the input records. If uniqueness is not necessary, we can just use a List<T>
//Dictionary<string, SomeClass> outputBuffer = null;
/// <summary>
/// Initializes a new instance of the <see cref="WebProcessingService"/> class.
/// </summary>
public WebProcessingService(string[] inputRecords)
{
this.BuildInternalBuffers(inputRecords);
}
/// <summary>
/// Builds the internal buffers, both the input and output buffer
/// </summary>
/// <param name="inputRecords">Input records</param>
private void BuildInternalBuffers(string[] inputRecords)
{
this.inputBuffer = inputRecords;
outputBuffer = new Dictionary<string, string>(inputRecords.Length);
indexer = 0;
}
/// <summary>
/// Method initiates the mutli threaded processing of web calls
/// </summary>
/// <returns></returns>
public Dictionary<string, string> ProcessRecords()
{
Thread[] pool = new Thread[Settings.Default.maxThreads];
for (int i = 0; i < pool.Length; i++)
{
//spawn out a new thread
Thread t = new Thread(new ThreadStart(ProcessCalls));
//mark it as a non-background thread
t.IsBackground = false;
//start
t.Start();
//save the reference so that the main thread waits for its termination
pool[i] = t;
}
//make the caller wait for all the threads to terminate
foreach (Thread t in pool)
{
t.Join();
}
//return the output
return outputBuffer;
}
/// <summary>
/// Gets the data.
/// </summary>
/// <returns></returns>
private string[] GetData()
{
// apply muetx to make sure that only one thread enter the region
getDataSyncMutex.WaitOne();
try
{
//init the temp buffers to copy from the input records
string[] tempBuffer = new string[Settings.Default.recordsPerThreadPerIteration];
//check if the indexer is withing the array bound and that i is within the bounds
// of the temp buffer
for (int i = 0; i < tempBuffer.Length && indexer < this.inputBuffer.Length;
i++, indexer++)
{
tempBuffer[i] = this.inputBuffer[indexer];
}
//now check if the indexer has gone out of bounds of the length of the buffer
// this means that all input records have been passed onto the threads
if (indexer > this.inputBuffer.Length)
{
//signal end of input
return null;
}
//if the indexer has not yet reached the end, check if it is at the boundary
// if so increment it so that the next thread entering gets the signal of
// end-of-input
else if (indexer == this.inputBuffer.Length)
{
indexer++;
}
// the remaining case is when the indexer is < this.inputBuffer.Length
// which is safe as it means that the next thread will get input records
return tempBuffer;
}
finally
{
//even if there is some exception, this will release the lock
getDataSyncMutex.ReleaseMutex();
}
}
/// <summary>
/// Processes the web calls.
/// </summary>
private void ProcessCalls()
{
//instantiate the proxy
FooWebService webProxy = new FooWebService();
//instantiate the temp buffer
string[] buffer = null;
while (true)
{
//clear it out for the start of each iteration
buffer = null;
//get the data
buffer = this.GetData();
//if end-of-input is specified, done!
if (buffer == null)
{
break;
}
string[] outTempBuffer = new string[buffer.Length];
// process each of the elem
for (int i = 0; i < buffer.Length; i++)
{
if (buffer[i] != null)
{
outTempBuffer[i] =
webProxy.SomeMetho(buffer[i]);
}
}
//once we get the current set out, feed into the output dictionary
this.SetData(outTempBuffer);
}
}
/// <summary>
/// Sets the data.
/// </summary>
/// <param name="tempBuffer">The output record.</param>
private void SetData(string[] tempBuffer)
{
//apply lock for mutual exlusion
setDataSyncMutex.WaitOne();
try
{
for (int i = 0; i < tempBuffer.Length; i++)
{
//check if the key isnt already present before trying to add
if (tempBuffer[i] != null
&& !string.IsNullOrEmpty(tempBuffer[i])
&& !outputBuffer.ContainsKey(tempBuffer[i]))
{
//add if not present
outputBuffer.Add(tempBuffer[i],tempBuffer[i]);
}
}
}
finally
{
//release lock
setDataSyncMutex.ReleaseMutex();
}
}
}