The concurrent_queue Container in VS2010
Now that Visual Studio Beta2 has been available for a few weeks (see Rick’s earlier blog post) I hope you’ve had a chance to experiment with one or both of the concurrent containers that were introduced: concurrent_queue<T> and concurrent_vector<T>. These two containers are lock-free, and are used to avoid synchronization bottlenecks in parallel algorithms. As with all concurrency features, they come with a set of pitfalls that must be avoided. In this post I will focus on the concurrent_queue, its design, proper usage of its methods, and its use in some real scenarios.
From std::queue to concurrent_queue
It is enlightening to compare the concurrent_queue design against that of the standard C++ std::queue, the latter which is, of course, not concurrency-safe. The basic API of std::queue’s enqueue/dequeue functionality can be summarized as follows:
template<class T>
class queue
{
public:
void push(const T&);
size_type size() const;
T& front();
void pop();
};
Forgetting for the moment that std::queue, as implemented, is not concurrency-safe, we can first note that the enqueue portion of this API could be concurrency-safe. Enqueuing an item can be accomplished with a single call to push(), and this operation is atomic in that it is performed by a single method call, which means that the enqueued item will be added to the queue by the time push() returns.
However, the dequeue portion of this API is not, and cannot be, concurrency-safe because the dequeue operation on std::queue is not atomic; it requires three separate sub-operations: size(), front(), and pop().
if (q.size() > 0)
{
T x = q.front();
q.pop();
}
To safely dequeue an item we must first check to see if the queue is empty, to avoid dequeueing from an empty queue. Then we have to retrieve the value from the front of the queue. After we have the front item, we need to remove it from the queue by calling pop(). Concurrent enqueue and dequeue operations that occur after the call to size(), or after the call to front(), can result in unexpected behaviour. Here, for example, are three potential races that could play out:
- The internal implementation of std::queue could be corrupted, which would result in undefined behaviour, likely a crash.
- The result of (q.size() > 0) cannot be trusted if a concurrent push() or pop() occurs immediately afterward. If the queue was empty (q.size() == 0), a concurrent push could occur immediately after the check, which would cause the rest of this dequeue operation to be skipped when in fact there is an item on the queue. If the queue had a single element in it (q.size() == 1), a concurrent pop could occur immediately after the check, causing the queue to become empty; this pop() would fail because the queue is now empty, which in debug-mode would throw an assert dialog.
- Two threads might dequeue simultaneously. The first thread could retrieve the front item, and then get interrupted. The second thread could then run and retrieve the front item, which is the same item as the first thread. Assuming nothing else bad happens (see #1), both threads will have dequeued the same item.
Why is std::queue designed this way? Imagine that front() and pop() were combined into a single method and invoked as:
T x = q.pop(); // Hypothetical API
If the copy-assignment operator in type “T” throws an exception, the value dequeued would be lost forever. By separating the retrieval of the item from the queue from its removal from the queue, users can potentially recover from this situation. This separation makes std::queue’s API exception-safe, but concurrency-unsafe.
As we’ll soon see, concurrent_queue makes the dequeue operation concurrency-safe by making it atomic.
Concurrency-Safe Operations on concurrent_queue
The concurrent_queue is concurrency-safe with respect to the following concurrent operations:
- Enqueues concurrent with enqueues
- Dequeues concurrent with dequeues
- Enqueues concurrent with dequeues
These operations are internally synchronized using a lock-free algorithm. This allows concurrent_queue to perform much better than one implemented using coarse-grained locking.
Enqueue: The enqueue operation on concurrent_queue is straightforward, and identical to that of std::queue:
void push(const T& source)
This will push a source value onto the tail of the queue, synchronizing with other concurrent enqueue/dequeue operations.
Dequeue: Instead of three method calls to achieve a dequeue, the concurrent_queue’s dequeue operation is encapsulated in a single method, try_pop():
bool try_pop(T& destination);
Note that the name of the method is now try_pop(), which as its name implies, attempts to pop an item from the head of the queue. If the dequeue was successful, the dequeued value is stored in the “destination” parameter, and this method returns true.
We’ve seen that a separate check for queue emptiness prior to popping is subject to races. The concurrent_queue try_pop() method solves this issue by simply returning false if we attempted to pop from an empty queue. Note that this is not a failure, nor does it necessarily indicate an error in the program that calls it. Frequently, the correct action for callers of try_pop() is to retry when false is returned. I’ll talk about this further down.
I mentioned that std::queue’s dequeue operation was exception-safe but not concurrency-safe. Here, concurrent_queue’s dequeue operation is concurrency-safe, but not exception-safe. If the assignment operator of type “T” throws an exception, the dequeued value will be lost.
Concurrency-Unsafe Operations on concurrent_queue
The methods discussed here are not safe to call during concurrent push() and try_pop() operations. They are meant to be used only after all concurrent operations have completed.
empty: This method returns true if the container has no elements:
bool empty() const;
While this method is technically thread-safe (it won’t corrupt the state of the concurrent_queue), the value it returns isn’t terribly useful because the returned value might immediately become incorrect if a concurrent push() or try_pop() happens.
unsafe_size: As is clearly apparent from its name, this method is concurrency-unsafe.
size_type unsafe_size() const;
This method can produce incorrect results if it is called concurrently with push() and try_pop() operations. To understand why, it is necessary to understand a bit about how the concurrent_queue operates under the covers. There are 2 member variables in concurrent_queue that demarcate a sliding window that keeps track of the number of elements enqueued and dequeued through its lifetime: _Tail_counter and _Head_counter. When an element is enqueued, _Tail_counter is incremented. When an element is dequeued, _Head_counter is incremented. The number of elements in the queue at any given time is (_Tail_counter - _Head_counter), and indeed this formula expresses exactly how unsafe_size() is computed. Now consider the following case:
- Assume for this example that _Tail_counter is 12, and _Head_counter is 10. The queue has 2 elements.
- Thread 1 calls unsafe_size(). It gets as far as fetching the value of “_Tail_counter” before it gets pre-empted. It has fetched the value 12.
- Thread 2 performs 5 push() operations followed by 5 successful try_pop() operations. _Tail_counter will now be 17, while _Head_counter will now be 15. The queue still has 2 elements.
- Thread 1 resumes and finishes computing unsafe_size(). It fetches the value of _Head_counter, which is 15. It is now going to subtract the new value of _Head_counter (15) from the old value of _Tail_counter (12) and return (12 – 15), or -3 as the queue size. But wait! The concurrent_queue’s “size_type” is an unsigned type, which means the actual value will be 4294967293, or, if you prefer, really really huge.
clear: It is not concurrency-safe to clear out the contents of the concurrent_queue during concurrent operations:
void clear();
If, after all concurrent operations are complete, you wish to ensure that the concurrent_queue is empty and that all its elements are destructed, you can call this method. The concurrent_queue destructor will also implicitly clear out all its elements.
Iteration: Iteration over a concurrent_queue is not thread-safe and all methods that return iterators are explicitly prefixed with the word “unsafe_”.
iterator unsafe_begin();
iterator unsafe_end();
const_iterator unsafe_begin() const;
const_iterator unsafe_end() const;
Iterating while concurrent push() and try_pop() operations are happening will yield undefined results (e.g. crashing). However, for debugging it can be useful to traverse any remaining elements in the concurrent_queue and dump them out, and that’s what these methods are for.
Using concurrent_queue for Producer-Consumer
The concurrent_queue is an ideal data structure for producer-consumer scenarios. In the following simplistic example, we schedule one task that pushes (produces) 1000 integers, and the main thread pops (consumes) those integers:
// Task that produces 1000 items
void ProducerTask(void* p) {
concurrent_queue<int>* pq = (concurrent_queue<int>*)p;for (int i = 0; i < 1000; ++i)
pq->push(i);
}void Producer_Consumer() {
concurrent_queue<int> q;// Schedule a task to produce 1000 items
CurrentScheduler::ScheduleTask(ProducerTask, &q);// Consume 1000 items
for (int i = 0; i < 1000; ++i) {
int result = -1;while (!q.try_pop(result))
Context::Yield();assert(result == i);
}
}
Note that the consumer must retry (spin) if the try_pop() operation fails. Since this simple example guarantees that the producer will enqueue 1000 items, a failure in the dequeue operation simply means that the consumer thread is outpacing the producer thread, and eventually the producer thread will catch up and enqueue an item. When retrying in this way, it is very important to call Context::Yield() inside the spin-loop, which cooperatively yields to any other runnable tasks, allowing the producer task an opportunity to run. To see why this is important, imagine running this code on a single-core machine. Without the Context::Yield(), the consumer will busy-wait for an item to appear on the queue, starving out the producer task. If the producer task is starved, the process live-locks.
Debugger Visualizations for concurrent_queue
The internal implementation data structures for concurrent_queue are very complex. If you want to explore the internals of it, have fun. However, when debugging a program, developers simply want to see the contents of the concurrent_queue, not its internal data structures. In Beta2 we have included debugger visualizers for concurrent_queue (and concurrent_vector) so that they appear like their corresponding STL data structures in the debugger’s watch window. Here’s a sample of what the Visual Studio debugger’s watch window would look like for a concurrent_queue<int> that contains the three elements 5, 7, and 9:
Summary
The concurrent_queue container was adapted from the concurrent_queue in Intel’s Threading Building Blocks (and a big “thank you” goes to Intel for collaborating with us and allowing us to modify their implementation). If you’re familiar with the TBB concurrent_queue, you’ll note quite a few API differences, mainly because TBB’s queue supports bounded/blocking operations. Intel and Microsoft collaborated on a new, non-blocking concurrent_queue, and you’ll see TBB’s concurrent_queue start to conform to that of the Concurrency Runtime starting in TBB 2.2.
We will soon follow up with another post that talks about concurrent_vector.
Comments
Anonymous
November 24, 2009
In the Producer-Consumer example, why does ProducerTask() take a void* as parameter? Is this necessary, or would concurrent_queue<int>* have worked?Anonymous
November 24, 2009
ProducerTask takes a void* as a parameter because it is using the ScheduleTask API which takes a void* for the data and requires you to dereference that data; ScheduleTask is similar to a CreateThread or ThreadPool API in that respect. If the example had used a message block like Concurrency::call<T> or a task_group then this wouldn't have been necessary.Anonymous
December 10, 2009
I do not understand how a single call to function push() could be concurrency safe. Thread-A will have the old copy of the queue size, if Thread-B increments the queue size inside the push() just after Thread-A reads the old value for the queue size.Anonymous
December 11, 2009
The concurrent_queue size has nothing to do with the push operation. Indeed as I indicated, computing the queue size is not a concurrency-safe operation. In that discussion above, I mentioned that the queue keeps two counters going: _Tail_counter is incremented for every push(), and _Head_counter is incremented for every successful try_pop(). At no time during the push() operation does the queue ever try to maintain a consistent view of the queue's size. As far as push() is concerned, it only needs to exclusively reserve a "slot" into which the item will be enqueued, and it does this by atomically incrementing the _Tail_counter. A concurrent push() operation on another thread will never reserve the same slot, so the _Tail_counter will always accurately reflect the number of push() operations, those that have completed as well as those that are still in flight.