Freigeben über


Throttling using managed semaphores and named monitors

Firstly I would like to explain what is a managed semaphore. This was introduced to me during a session by Andy Clymer (developmetor). Basically the requirement of a semaphore is pretty clear but then again we don't have to resort to kernel objects and mutexes if not required to step out from our little managed world. Hence the concept of managed semaphore. Basically the idea is to allow only a certain number of threads to any critical section. This enables the number of parallel threads executing a block of code to the max count of the semaphore.

Let look at a simple piece of code doing this.

We would like to do something like this.

 

semaphore.Acquire()

try{

... // throttled code

...

}

finally{

semaphore.Release();

}

Now lets see a class that would enable use to do something like this.

public class Semaphore   {       private int current;       private int max;       private object semLock = new object();

      public Semaphore(int max)       {           this.current = 0;           this.max = max;       }

      public void Acquire()       {           bool acquired = false;

          lock (semLock)           {               Debug.Assert(current <= max, "Current has gone out of range..");

              while (current >= max)               {                   Monitor.Wait(semLock);               }

              current++;               acquired = true;               Debug.Assert(current <= max, "Current has gone out of range..");           }       }

      public void Release()       {           lock (semLock)           {               Debug.Assert(current <= max, "Current has gone out of range..");

              current--;

              // Mark a single thread that has called Monitor.Wait(semLock)               // as ready to run...               // only run it when it can be granted the semLock Monitor.               Monitor.Pulse(semLock);

              Debug.Assert(current >= 0, "Current has gone out of range..");

          }       }   }

 

Now that we have a class that can enable throttling we need to figure out how to enable resource based throttling.

This implementation would suffice if the resource itself is capable of parallelism. Assume writing to a file or a port. Now we sometimes have scenarios where only one thread can write to a file. But we might need multiple threads to write to different files/ports. Think of a simple processor where we are sending out messages to different ports but we can have only on thread sending out messages to a particular port.

Hence we have the resource itself usage mutually exclusive but different instances of the resources can be accessed in parallel.Now to write code like this would could end up writing double locks which is quite messy code in itself and can end up in deadlocks if you don't acquire locks in the correct sequence. Would like to call out the fork and knifes problem. (lets leave this for another post).

Thinking about this lead me to write a named semaphore. Basically the idea was to keep a dictionary of the resources currently accessed.

If a resource makes a request we give it a lock. This is under the condition that there is no other thread owning the current resource. This also checks if the current number of locks obtained are <= the max count hence checking our throttling count. Basically to put it in a nutshell. A thread can acquire a lock if there are enough number of available semaphores and there is no parallel thread who has acquired the lock for the current request.

To put this down at one shot we can just rewrite the semaphore class to have a dictionary and a count. and this is what I ended up with.

 

public class KeyBasedSemaphore<T> where T : struct {     Dictionary<T, object> locks = new Dictionary<T, object>();     object thisLock = new object();     int max = 0;     int current = 0;

    public KeyBasedSemaphore(int maxCount)     {         this.max = maxCount;     }

    public void Acquire(T key)     {         lock (thisLock)         {             Debug.Assert(current <= max, "Current has gone out of range..");

            bool acquired = false;             while (locks.ContainsKey(key))             {                 Monitor.Wait(thisLock);             }

            locks.Add(key, new object());

            //Wait if there are already max             //number of instances running.             while (current >= max)             {                 Monitor.Wait(thisLock);             }

            acquired = true;             current++;             Debug.Assert(current <= max, "Current has gone out of range..");         }     }

    public void Release(T key)     {         lock (thisLock)         {             Debug.Assert(current <= max, "Current has gone out of range..");

            //Remove the lock on the instance and             //Decrement the counter.             locks.Remove(key);             current--;

            //Pulse any waiting processes.             Monitor.Pulse(thisLock);

            Debug.Assert(current >= 0, "Current has gone out of range..");         }     } }

 

OK I admit its dirty in its own sweet way. I send this and pretty much got a request to refractor it. Basically Andy gave a very clear suggestion and asked me as the code would become clearer it I split out the intent. Primarily what we needed to do was the separate the semaphore with the named set of locks.

This is primarily what we need to achieve.

 

Obtained a named monitor ( name = file or port etc )

Obtain the Semaphore

Do Activity

Release Semaphore

Exit named monitor

 

 

Well this is from Andy

"I would therefore be inclined not to modify the Semaphore class to produce the required behaviour but to create two new classes..Produce a three class solution using the original semaphore class, and then a class to encapsulate the new synchronization pattern and a second to implement named monitor functionality using a monitor. 

As for performance, in the implementation you supplied If a thread is currently in a wait state waiting for access to the “port/file”  it is being woken up every time a release is made irrespective if its for the resource it is interested in, by refactoring as above you could  make it so they only woken up when the actual resource they are after becomes available.  Having said that my way means using more objects for synchronization and that cost may out way any benefit..It will almost certainly depend on the behaviour of the application logic, if there is high contention for a given resource I think my way would be more optimal if not then using a single synchronization object may very well be quicker..

So unless performance is absolutely crucial I still like the idea of keeping semaphore behaviour and the exclusive access behaviour in two separate classes, wrapped by a third to implement your required synchronization pattern, because it makes the intent clearer and allows the re-use of common components."

 

And finally the the named monitor class that Andy refactored it out to.

 

class NamedMonitorCollection<K>    {        private class NamedMonitor        {            public int usageCount = 0;        }

       private Dictionary<K, NamedMonitor> namedMonitors = new Dictionary<K, NamedMonitor>();

       public void Enter(K name)        {            NamedMonitor monitorToAquire = null;

           lock (namedMonitors)            {                // Is named monitor currently in use, if not create an entry                if (namedMonitors.ContainsKey(name) == false)                {                    namedMonitors[name] = new NamedMonitor();                }

               monitorToAquire = namedMonitors[name];

               // Register threads interest in this monitor                monitorToAquire.usageCount++;            }

           // Attempt to aquire the appropriate monitor            Monitor.Enter(monitorToAquire);        }

       public void Exit(K name)        {            lock (namedMonitors)            {                Debug.Assert(namedMonitors.ContainsKey(name), "This monitor is unknown");

               Monitor.Exit(namedMonitors[name]);

               // Un register interest in  the named monitor                // if no interested parties then remove it from the list                // of known named monitors                namedMonitors[name].usageCount--;                if (namedMonitors[name].usageCount == 0)                {                    namedMonitors.Remove(name);                }            }        }

       /// <summary>        /// IEnumerable method        /// </summary>        /// <returns></returns>        internal IEnumerable<K> GetMonitorsInUse()        {            lock (namedMonitors)            {                foreach (K monitor in namedMonitors.Keys)                {                    yield return monitor;                }            }        }

       public int inUseCount        {            get            {                lock (namedMonitors)                {                    return namedMonitors.Count;                }            }        }

   }

 

Now if we look at the code above we are surely making more locks but the code is much more clearer. The overheads can come into play only if we have a large number of resources waiting for threads and this would end up in a large number of locks and contentions. On the other hand the first implementation still works with only one lock and hence might have minor improvement. Then again we can write everything in assembly.

 

Hope this might help in some ideas in doing interesting parallel processing in a purely managed world :)

Comments