Поделиться через


Blocking queues

In many concurrent systems, one thread performs some work, the result of which another thread consumes. This producer/consumer pattern is frequently implemented on top of blocking queues.

If you examine the behavior of System.Collections.Queue and System.Collections.Generic.Queue<T>, you’ll find that they both throw InvalidOperationException from the Dequeue method (which attempts to remove and return an item from the queue) if there are no items in the Queue (and, thus, there’s nothing to be removed). For a producer/consumer scenario, that behavior is typically suboptimal. A producer creating results can store those results into a queue, and a consumer can retrieve those results from the queue in order to process them. But for a concurrent application, where the producer and consumer are executing on separate threads (and where there might be multiple instances of both the producers and consumers executing simultaneously), synchronization mechanisms are necessary on top of queue. More specifically, typically a design calls for a consumer to block until an item is available for it to consume. Rather than attempting to reimplement such blocking functionality every time we need it, we can wrap it up into a new type:

 class BlockingQueue<T> : IDisposable
{
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _semaphore = new Semaphore(0, int.MaxValue);

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException(“data”);
        lock (_queue) _queue.Enqueue(data);
        _semaphore.Release();
    }

    public T Dequeue()
    {
        _semaphore.WaitOne();
        lock (_queue) return _queue.Dequeue();
    }

    void IDisposable.Dispose()
    {
        if (_semaphore != null)
        {
            _semaphore.Close();
            _semaphore = null;
        }
    }
}

BlockingQueue<T> contains two private fields and exposes two public methods (plus an implementation of IDisposable to allow for timely cleanup). The public methods mimic those of Queue<T>: Enqueue, accepting the data of type T to be enqueued, and Dequeue, returning an item of type T. Enqueue has the same semantics as Queue<T>, except BlockingQueue<T>.Enqueue is thread-safe (Queue<T>.Enqueue is not). Dequeue, however, has very different semantics from both Queue.Dequeue and Queue<T>.Dequeue. In addition to being thread-safe (which the other Dequeue methods are not), it also blocks until it knows for sure it'll be able to dequeue an item from the queue.

The first of the two private methods is fairly self-explanatory. The _queue member of type Queue<T> is the actual queue that's storing the data. Any calls to BlockingQueue<T>.Enqueue or BlockingQueue<T>.Dequeue delegate to the corresponding methods on _queue. However, as you can see a few additional things are taking place. First, a monitor is used to synchronize access to the queue. But more importantly, a counting semaphore is used to keep track of the number of items that are available to be removed from the queue. The semaphore starts with a count of 0, which means that any calls to Dequeue will block. Every call to Enqueue increments the semaphore's counter, allowing another non-blocking call to Dequeue.

This, of course, isn't the only way to build a BlockingQueue, nor do we absolutely require the System.Threading.Semaphore class. For example, you can implement a semaphore in purely managed code with a simple class like the following:

 public class ManagedSemaphore
{
    private int _count;
    private object _lock;

    public ManagedSemaphore(int initialCount)
    {
        if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount", "Initial count must be >= 0.");
        _count = initialCount;
        _lock = new object();
    }

    public void WaitOne()
    {
        lock (_lock)
        {
            while (_count <= 0) Monitor.Wait(_lock);
            _count--;
        }
    }

    public void Release()
    {
        lock (_lock)
        {
            _count++;
            Monitor.Pulse(_lock);
        }
    }
}

Like System.Threading.Semaphore, this ManagedSemaphore class exposes a WaitOne and a Release method. Internally, it has two member fields, an integer count and an object used for a monitor lock. The Release method aquires the lock, and while holding it, increments the internal count and then signals a single thread that might be waiting on the same lock that it should wake up and attempt to retrieve an item (if no thread is waiting, the next time one does call WaitOne, it'll see a positive count). The WaitOne method similarly takes the lock. While holding the lock, it checks the internal count. If the count is positive, it simply decrements the count and returns. However, if the count is 0, it uses the Monitor.Wait method to temporarily release the lock and wait for it to be signaled by another thread (which will happen when another thread calls Release).

We could substitute this ManagedSemaphore for Semaphore in our BlockingQueue<T>. However, given that this is all managed code, we could also incorporate the semaphore's implementation directly into BlockingQueue<T>:

 class BlockingQueue<T> : IEnumerable<T>
{
    private int _count = 0;
    private Queue<T> _queue = new Queue<T>();

    public T Dequeue()
    {
        lock (_queue)
        {
            while (_count <= 0) Monitor.Wait(_queue);
            _count--;
            return _queue.Dequeue();
        }
    }

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException("data");
        lock (_queue)
        {
            _queue.Enqueue(data);
            _count++;
            Monitor.Pulse(_queue);
        }
    }
}

While nothing official, in a few basic tests this does show some performance improvements over the original BlockingQueue<T> implemented using System.Threading.Semaphore, clocking in at around 3x the speed.

We can further extend BlockingQueue<T> to implement IEnumerable<T>, making it easy to consume all of the items "in" the queue from a foreach loop:

 class BlockingQueue<T> : IEnumerable<T>
{
    private int _count = 0;
    private Queue<T> _queue = new Queue<T>();

    public T Dequeue()
    {
        lock (_queue)
        {
            while (_count <= 0) Monitor.Wait(_queue);
            _count--;
            return _queue.Dequeue();
        }
    }

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException("data");
        lock (_queue)
        {
            _queue.Enqueue(data);
            _count++;
            Monitor.Pulse(_queue);
        }
    }

    IEnumerator<T> IEnumerable<T>.GetEnumerator()
    {
        while (true) yield return Dequeue();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }
}

This allows code like the following:

 BlockingQueue<int> _queue = new BlockingQueue<int>();
...
foreach(int item in _queue)
{
    ...
}

Note that, as written, any code after the foreach loop won't actually execute, since the foreach is, for all intents and purposes, an infinite loop. The implementation of the enumerator (as implemented by the C# compiler) will block on _queue.Dequeue until another thread Enqueues an item into _queue. If you want to follow a pattern like this, unless you actually do want an infinite loop, you can use a break statement to get out of it if some condition is true. For example, if you only wanted the first five items, you could modify the loop as follows:

 int count=0;
foreach(int item in _queue)
{
    ...
    if (5 == ++count) break;
}

Comments

  • Anonymous
    April 12, 2006
    In my last post, I took a look at implementing blocking queues in .NET using semaphores (both System.Threading.Semaphore...

  • Anonymous
    August 28, 2006
    Hi,

    I tried using the Blocking Queue example using semaphore, I just converted the code from C# to VB.NET , It works fine but with frequent Enqueue and Dequeue there is a block during Dequeuing the data from the queue which consumes CPU Cycle and Performance is hit badly.I am new to Threading and Queuing stuff and dont understand whats going wrong

    Thanks and Regards
    Arun
    You can reply me back at arunv.vijay@gmail.com

  • Anonymous
    August 29, 2006
    If there are no items in the queue, it's meant to block during the dequeue operation (that's the point of this implementation).  But a monitor wait should not consume CPU resources; how did you come to the conclusion that it is doing so?

  • Anonymous
    August 15, 2007
    Very cool, just wondered how you would implement Teardown? e.g A thread is blocked within the foreach, there are no items in the queue. Some Business logic what's the thread to quit, but its bloked in the foreach?

  • Anonymous
    August 16, 2007
    It'd be a bit more difficult to do with the version that uses a managed-only semaphore implementation, but it'd be relatively easy to do with the version that uses System.Threading.Semaphore.  Basically, you'd have an additional internal WaitHandle, probably a ManualResetEvent, that would be initialized to non-signaled; let's call it _cancel.  Then, in Dequeue, instead of calling _semaphore.WaitOne, you'd call WaitHandle.WaitAny(new WaitHandle[]{_semaphore, _cancel}). You'd add a Cancel method or Teardown or whatever you want to call it, and all it does is signal the _cancel WaitHandle.  Then, when WaitAny returns, you check to see whether it was the semaphore that was signaled or the cancel wait handle, based on the return value from WaitAny.  If it was the semaphore, you do the logic shown; if it was the cancel, you bail out using whatever programming model you prefer (an exception, or changing the signature to return a boolean with an out parameter for the data, etc.)  You could be fancier than this, but that's the basic idea.

  • Anonymous
    November 28, 2007
    In my last post , I took a look at implementing blocking queues in .NET using semaphores (both System.Threading.Semaphore

  • Anonymous
    March 26, 2008
    does this work in the Compact Framework 2.0?

  • Anonymous
    February 03, 2009
    Won't your implementation of ManagedSemaphore cause a deadlock when one thread is waiting and another releases?

  • Anonymous
    February 03, 2009
    Why do you believe it will cause a deadlock?  Can you outline the steps that would cause that to happen?

  • Anonymous
    August 13, 2009
    The .NET Compact Framework has limitation over the full framework. For example, Semaphores amd Monitor.Pulse are not available. What is available is ...    System.Threading.AutoResetEvent    System.Threading.EventResetMode    System.Threading.EventWaitHandle    System.Threading.ManualResetEvent    System.Threading.Mutex    System.Threading.WaitHandle How would you implement based on this. It would be great if you coudl provide an update or new blog. Thanks, Liam

  • Anonymous
    August 13, 2009
    The comment has been removed

  • Anonymous
    August 13, 2009
    Thanks for the reply. I managed too look it over. I have run into another limitation of the CF. It does not have :- WaitHandle.SignalAndWait() The CF limitations are really causing me problems! Liam

  • Anonymous
    August 26, 2009
    I had the same problem with missing the semaphore in .NET CF. So I basically created one myself with the use of ManualResetEvents. It seems to work quite well as far as I tested it. Here is what I got, any comment would be appreciated:  /// <summary>    /// Tries to implement a very simple semaphore (without limit) as this is not available in .NET CF    /// </summary>    class Semaphore    {        private Queue<ManualResetEvent> queue;        private int counter;        private object _lock=new Object();        private bool closing;        public Semaphore(int counter)        {            this.counter = 0;            closing=false;            queue = new Queue<ManualResetEvent>();        }        public void WaitOne()        {            while (true)            {                lock (_lock)                {                    if (counter > 0 || closing)                        break;                }                ManualResetEvent mre = new ManualResetEvent(false);                lock (_lock)                {                    queue.Enqueue(mre);                }                mre.WaitOne();            }            counter--;        }        public void Release(int count)        {            lock (_lock)            {                for (int i = 0; i < count; i++)                {                    counter++;                    if (queue.Count > 0)                    {                        ManualResetEvent mre = queue.Dequeue();                        mre.Set();                    }                }            }        }        public void Release()        {            Release(1);        }        public void ReleaseAll()        {            Release(queue.Count);        }        public void Close()        {            closing = false;            ReleaseAll();        }    }

  • Anonymous
    May 06, 2010
    There is no need for the _count variable. You can piggy back on the Queue.Count property. It cleans up the implementation a little bit.

  • Anonymous
    May 07, 2010
    Thanks, Brian.  Yes, that's true... I was simply trying to keep the example consistent with the previous ManagedSemaphore example.

  • Anonymous
    May 13, 2010
    Hi, I have a blocking queue implemented similar to what you posted here. It works fine but now I need to tweek it a little bit. I need that my dequeue method to give me the data if a condition is met (ex: data timestamp<current time). If that condition is not met wait untill is met or untill I have another piece of data in the queue that mets the condition. I am thinking that instead of my queue to use a sorded dictionary with the timestamp as key, so that my data is corectly sorted .. but I have no ideea how to make it wait untill the data is good to be processed. Did you ever do something similar ? or have any ideea how this can be implemented ?

  • Anonymous
    November 21, 2013
    The comment has been removed

  • Anonymous
    November 21, 2013
    The comment has been removed