Compartilhar via


Bounded blocking queues

In my last post, I took a look at implementing blocking queues in .NET using semaphores (both System.Threading.Semaphore and a managed semaphore implementation).  I defined a blocking queue as one that blocked on a dequeue operation until an item could successfully be removed from the queue, rather than throwing an exception if no items exist in the queue, as is the behavior with System.Collections.Queue and System.Collections.Generic.Queue<T>.  However, there are other types of blocking queues.  For example, a bounded blocking queue has the same blocking dequeue functionality, but it also has an upper bound on the size the queue can reach.  If that size is reached, any enqueue operations will block until space is available in the queue to store that new item being enqueued, which will happen when an item is dequeued.  This deviates from the behavior of Queue and Queue<T>, where the size of the Queue is simply expanded to accomodate more items (assuming enough contiguous memory is available for the operation).

Starting with my original BlockingQueue<T> implementation, I can create a BoundedBlockingQueue<T> with only a few more lines of code and an extra Semaphore member:

 class BoundedBlockingQueue<T> : IDisposable
{
    private Queue<T> _queue;
    private Semaphore _itemsAvailable, _spaceAvailable;

    public BoundedBlockingQueue(int size)
    {
        if (size <= 0) throw new ArgumentOutOfRangeException("size");
        _itemsAvailable = new Semaphore(0, size);
        _spaceAvailable = new Semaphore(size, size);
        _queue = new Queue<T>(size);
    }

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException("data");
        _spaceAvailable.WaitOne();
        lock (_queue) _queue.Enqueue(data);
        _itemsAvailable.Release();
    }

    public T Dequeue()
    {
        T item;
        _itemsAvailable.WaitOne();
        lock (_queue) item = _queue.Dequeue();
        _spaceAvailable.Release();
        return item;
    }

    void IDisposable.Dispose()
    {
        if (_itemsAvailable != null)
        {
            _itemsAvailable.Close();
            _spaceAvailable.Close();
            _itemsAvailable = null;
        }
    }
}

BoundedBlockingQueue<T> maintains three member variables.  The first is the underlying Queue<T> that stores the data.  The other two are System.Threading.Semaphore instances, one representing the number of items that are available for dequeuing, and one representing the number of open slots in the queue that can be filled through enqueue operations.

When the class is constructed, the queue is initialized with an initial capacity of the specified queue bounding size (the maximum number of items that can be in the queue).  The _itemsAvailable Semaphore is initialized with an internal count of 0 (there are no items currently in the queue), and the _spaceAvailable Semaphore is initialized with an internal count equal to the bounding size (there are no items currently in the queue, so every slot can be filled).

An Enqueue operation waits for there to be space available by calling to _spaceAvailable.WaitOne.  Once space is available, the item is added to the underlying Queue<T>, and _itemAvailable.Release is used to signal that there is an item in the queue available for someone to dequeue.

A Dequeue operation does the opposite.  It first waits for an item to be available by calling to _itemAvailable.WaitOne.  When an item is available, it dequeues the item from the underlying queue, and before returning it, it calls _spaceAvailable.Release to signal that there is a space available in the queue for someone to enqueue an item into.

Pretty straightforward.  Of course, as with the BlockingQueue<T> implementation based on System.Threading.Semaphore, I've made some performance tradeoffs in the name of simplicity.  BoundedBlockingQueue<T> is arguably worse: every Enqueue and Dequeue operation requires WaitOne and Release calls to two different Semaphore instances as well as the usage of a Monitor lock.  For BlockingQueue<T>, I demonstrated one way the reliance on Semaphore could be removed; that same capability exists here, as BoundedBlockingQueue<T> can be rewritten with no reliance on System.Threading.Semaphore, rather only on Monitor for synchronization.  Care to try?

Comments

  • Anonymous
    April 12, 2006
    Here's an example of a bounded queue built with win32 condition variables:

    http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dllproc/base/using_condition_variables.asp

    This should be easy to convert to Threading.Monitor (WakeConditionVariable -> Pulse, etc).
  • Anonymous
    May 03, 2006
    Very nice implementation! Would be nice to have a 'Close()' method that stopped new additions to the queue, but allowed existing entries to be de-queued until no more remained. This might require mods to the Enqueue() and Dequeue() signatures. Maybe like:
    public bool Enqueue(T data) {...}
    (return true if data queued, false if queue already closed - guess you could also throw an Exception if queue was closed..)
    public bool Dequeue(out T item)
    {
    item = Default(T);
    ...
    }
    (returns item, plus true if value obtained from queue, or false if queue is empty)

    Sounds simple, but I think there might be some 'complexity' doing this propery :)

    cheers,
    dave