次の方法で共有


Multi-Threaded Web Service Calls - A C# Code Sample

Sometimes it becomes necessary to call an external web service in a multi threaded manner to speed up processing of records that we have in a buffer so that the overall time spent in waiting for the calls to return is reduced. The high-level steps to perform are:

Though we can have a long philosophical discussion as to which design to use, my intention here is to demonstrate and give some code sample as to how to achieve this. Some of the assumption for this are:

  • All the records which contain the info to be submitted to an external web service is in a common buffer
  • Each thread will process multiple records at a time before getting additional data from the buffer for the next iteration
  • The code is not optimized for performance and can be worked upon

Ok, now with the disclaimers out of the way, lets see the basic steps involved in cobbling up a multi thread routine to call a web service. Broadly, we need to perform the following steps:

  1. GetData(): Get the data from the buffer for each of the threads that will call the web service
  2. Process(): Call the web service using the web proxy
  3. SetData(): Merge the output obtained by each of the threads to a common output buffer

The three main things that need to be kept in mind are:

  1. Using MUTEX to make sure that no two threads get the same set of data for the input.
  2. Using MUTEX to make sure that no thread overwrites the output of the other.
  3. Making sure that no thread gets zombied and that the main calling thread waits for the others to terminate.

The complete code


     /// <summary>
    /// Multi Threaded web processor
    /// </summary>
    public sealed class WebProcessingService
    {
        // setup the mutex for the input buffer
        private readonly static Mutex getDataSyncMutex = new Mutex(false);

        // setup the mutex for the output
        private readonly static Mutex setDataSyncMutex = new Mutex(false);

        // this indexer is used to make sure that the input array bounds are not violated
        // when the threads are getting fed the input
        private static int indexer = 0;

        string[] inputBuffer = null;
        //Assuming that the records to be processed result in a unique output
        Dictionary<string, string> outputBuffer = null;

        // else we can use the below, but we need to have mechanism to have a unique key for
        // each of the input records. If uniqueness is not necessary, we can just use a List<T>
        //Dictionary<string, SomeClass> outputBuffer = null;

        
        /// <summary>
        /// Initializes a new instance of the <see cref="WebProcessingService"/> class.
        /// </summary>
        public WebProcessingService(string[] inputRecords)
        {
            this.BuildInternalBuffers(inputRecords);
        }
        

        /// <summary>
        /// Builds the internal buffers, both the input and output buffer
        /// </summary>
        /// <param name="inputRecords">Input records</param>
        private void BuildInternalBuffers(string[] inputRecords)
        {
            this.inputBuffer = inputRecords;
            outputBuffer = new Dictionary<string, string>(inputRecords.Length);
            indexer = 0;            
            
        }

        /// <summary>
        /// Method initiates the mutli threaded processing of web calls
        /// </summary>
        /// <returns></returns>
        public Dictionary<string, string> ProcessRecords()
        {
            
            Thread[] pool = new Thread[Settings.Default.maxThreads];

            for (int i = 0; i < pool.Length; i++)
            {
                //spawn out a new thread
                Thread t = new Thread(new ThreadStart(ProcessCalls));
                //mark it as a non-background thread
                t.IsBackground = false;
                //start
                t.Start();
                //save the reference so that the main thread waits for its termination
                pool[i] = t;
            }
            //make the caller wait for all the threads to terminate
            foreach (Thread t in pool)
            {
                t.Join();
            }
            //return the output
            return outputBuffer;

        }

        /// <summary>
        /// Gets the data.
        /// </summary>
        /// <returns></returns>
        private string[] GetData()
        {
            // apply muetx to make sure that only one thread enter the region
            getDataSyncMutex.WaitOne();
            
            try
            {
                //init the temp buffers to copy from the input records
                string[] tempBuffer = new string[Settings.Default.recordsPerThreadPerIteration];
                //check if the indexer is withing the array bound and that i is within the bounds
                // of the temp buffer
                for (int i = 0; i < tempBuffer.Length && indexer < this.inputBuffer.Length; 
                    i++, indexer++)
                {
                    tempBuffer[i] = this.inputBuffer[indexer];
                }
                //now check if the indexer has gone out of bounds of the length of the buffer
                // this means that all input records have been passed onto the threads
                if (indexer > this.inputBuffer.Length)
                {
                    //signal end of input
                    return null;
                }
                    //if the indexer has not yet reached the end, check if it is at the boundary
                    // if so increment it so that the next thread entering gets the signal of 
                    // end-of-input
                else if (indexer == this.inputBuffer.Length)
                {                    
                    
                    indexer++;
                }
                // the remaining case is when the indexer is < this.inputBuffer.Length
                // which is safe as it means that the next thread will get input records
                return tempBuffer;
                
            }
            finally
            {
                //even if there is some exception, this will release the lock
                getDataSyncMutex.ReleaseMutex();
            }
        }

        /// <summary>
        /// Processes the web calls.
        /// </summary>
        private void ProcessCalls()
        {
            //instantiate the proxy
            FooWebService webProxy = new FooWebService();
            //instantiate the temp buffer
            string[] buffer = null;
            while (true)
            {
                //clear it out for the start of each iteration
                buffer = null;
                //get the data
                buffer = this.GetData();
                //if end-of-input is specified, done!
                if (buffer == null)
                {
                    break;
                }
                string[] outTempBuffer = new string[buffer.Length];
                // process each of the elem
                for (int i = 0; i < buffer.Length; i++)
                {
                    if (buffer[i] != null)
                    {
                        outTempBuffer[i] =
                            webProxy.SomeMetho(buffer[i]);                        
                    }
                }
                //once we get the current set out, feed into the output dictionary
                this.SetData(outTempBuffer);
            }
            
        }

        /// <summary>
        /// Sets the data.
        /// </summary>
        /// <param name="tempBuffer">The output record.</param>
        private void SetData(string[] tempBuffer)
        {
            //apply lock for mutual exlusion
            setDataSyncMutex.WaitOne();
            try
            {
                for (int i = 0; i < tempBuffer.Length; i++)
                {
                    //check if the key isnt already present before trying to add
                    if (tempBuffer[i] != null
                        && !string.IsNullOrEmpty(tempBuffer[i])
                        && !outputBuffer.ContainsKey(tempBuffer[i]))
                    {
                        //add if not present
                        outputBuffer.Add(tempBuffer[i],tempBuffer[i]);
                    }
                }

            }
            finally
            {
                //release lock
                setDataSyncMutex.ReleaseMutex();
            }
        }
    }