How to: Synchronize a Producer and a Consumer Thread (C# Programming Guide)
The following example demonstrates thread synchronization between the primary thread and two worker threads using the lock keyword, and the AutoResetEvent and ManualResetEvent classes. For more information, see lock Statement (C# Reference).
The example creates two auxiliary, or worker threads. One thread produces elements and stores them in a generic queue that is not thread-safe. For more information, see Queue. The other thread consumes items from this queue. In addition, the primary thread periodically displays the contents of the queue, so the queue is accessed by three threads. The lock keyword is used to synchronize access to the queue to insure that the state of the queue is not corrupted.
In addition to simply preventing simultaneous access with the lock keyword, further synchronization is provided by two event objects. One is used to signal the worker threads to terminate, and the other is used by the producer thread to signal to the consumer thread when a new item has been added to the queue. These two event objects are encapsulated in a class called SyncEvents
. This allows the events to be passed to the objects that represent the consumer and producer threads easily. The SyncEvents
class is defined like this:
public class SyncEvents
{
public SyncEvents()
{
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);
_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}
public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}
public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}
public WaitHandle[] EventArray
{
get { return _eventArray; }
}
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;
}
The AutoResetEvent class is used for the "new item" event because you want this event to reset automatically each time the consumer thread responds to this event. Alternatively, the ManualResetEvent class is used for the "exit" event because you want multiple threads to respond when this event is signaled. If you used AutoResetEvent instead, the event would revert to a non-signaled state after just one thread responded to the event. The other thread would not respond, and in this case, would fail to terminate.
The SyncEvents
class creates the two events, and stores them in two different forms: as EventWaitHandle, which is the base class for both AutoResetEvent and ManualResetEvent, and in an array based on WaitHandle. As you will see in the consumer thread discussion, this array is necessary so that the consumer thread can respond to either event.
The consumer and producer threads are represented by classes named Consumer
and Producer
, both of which define a method called ThreadRun
. These methods are used as the entry points for the worker threads that the Main
method creates.
The ThreadRun
method defined by the Producer
class looks like this:
// Producer.ThreadRun
public void ThreadRun()
{
int count = 0;
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
while (_queue.Count < 20)
{
_queue.Enqueue(r.Next(0,100));
_syncEvents.NewItemEvent.Set();
count++;
}
}
}
Console.WriteLine("Producer thread: produced {0} items", count);
}
This method loops until the "exit thread" event becomes signaled. The state of this event is tested with the WaitOne method, using the ExitThreadEvent
property defined by the SyncEvents
class. In this case the state of the event is checked without blocking the current thread because the first argument used with WaitOne is zero, indicating that the method should return immediately. If WaitOne returns true, then the event in question is currently signaled. If so, the ThreadRun
method returns, which has the effect of terminating the worker thread executing this method.
Until the "exit thread" event is signaled, the Producer.ThreadStart
method attempts to keep 20 items in the queue. An item is simply an integer between 0 and 100. The collection must be locked before adding new items to prevent the consumer and primary threads from accessing the collection simultaneously. This is accomplished with the lock keyword. The argument passed to lock is the SyncRoot field exposed by means of the ICollection interface. This field is provided specifically for synchronizing thread access. Exclusive access to the collection is granted for any instructions contained in the code block following lock. For each new item that the producer adds to the queue, a call to the Set method on the "new item" event is made. This signals the consumer thread to emerge from its suspended state to process the new item.
The Consumer
object also defines a method called ThreadRun
. Like the producer's version of ThreadRun
, this method is executed by a worker thread created by the Main
method. However, the consumer version of ThreadStart
must respond to two events. The Consumer.ThreadRun
method looks like this:
// Consumer.ThreadRun
public void ThreadRun()
{
int count = 0;
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock (((ICollection)_queue).SyncRoot)
{
int item = _queue.Dequeue();
}
count++;
}
Console.WriteLine("Consumer Thread: consumed {0} items", count);
}
This method uses WaitAny to block the consumer thread until any of the wait handles in the provided array become signaled. In this case there are two handles in the array, one for terminating the worker threads, and one for indicating that a new item has been added to the collection. WaitAny returns the index of the event that became signaled. The "new item" event is the first in the array, so an index of zero indicates a new item. In this case check for an index of 1, which indicates the "exit thread" event, and this is used to determine whether this method continues to consume items. If the "new item" event was signaled, you get exclusive access to the collection with lock and consume the new item. Because this example produces and consumes thousands of items, you do not display each item consumed. Instead use Main
to periodically display the contents of the queue, as will be demonstrated.
The Main
method begins by creating the queue whose contents will be produced and consumed and an instance of SyncEvents
, which you looked at earlier:
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();
Next, Main
configures the Producer
and Consumer
objects for use with worker threads. This step does not, however, create or launch the actual worker threads:
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);
Notice that the queue and the synchronization event object are passed to both the Consumer
and Producer
threads as constructor arguments. This provides both objects with the shared resources they need to perform their respective tasks. Two new Thread objects are then created, using the ThreadRun
method for each object as an argument. Each worker thread, when started, will use this argument as the entry point for the thread.
Next Main
launches the two worker threads with a call to the Start method, like this:
producerThread.Start();
consumerThread.Start();
At this point, the two new worker threads are created and begin asynchronous execution, independent of the primary thread that is currently executing the Main
method. In fact, the next thing Main
does is suspend the primary thread with a call to the Sleep method. The method suspends the currently executing thread for a given number of milliseconds. Once this interval elapses, Main
is reactivated, at which point it displays the contents of the queue. Main
repeats this for four iterations, like this:
for (int i=0; i<4; i++)
{
Thread.Sleep(2500);
ShowQueueContents(queue);
}
Finally, Main
signals the worker threads to terminate by invoking the Set method of the "exit thread" event, and then calls the Join method on each worker thread to block the primary thread until each worker thread respond to the event and terminates.
There is one final example of thread synchronization: the ShowQueueContents
method. This method, like the consumer and producer threads, uses lock to gain exclusive access to the queue. In this case, however, exclusive access is particularly important, because ShowQueueContents
enumerates over the entire collection. Enumerating over a collection is an operation that is particularly prone to data corruption by asynchronous operations because it involves traversing the contents of the entire collection. The ShowQueueContents
method looks like this:
syncEvents.ExitThreadEvent.Set();
producerThread.Join();
consumerThread.Join();
Finally, notice that ShowQueueContents
, because it is called by Main
, is executed by the primary thread. This means that this method, when it achieves exclusive access to the item queue, is in fact blocking both the producer and consumer threads from accessing the queue. ShowQueueContents
locks the queue and enumerates the contents:
private static void ShowQueueContents(Queue<int> q)
{
lock (((ICollection)q).SyncRoot)
{
foreach (int item in q)
{
Console.Write("{0} ", item);
}
}
Console.WriteLine();
}
The complete example follows.
Example
using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
public class SyncEvents
{
public SyncEvents()
{
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);
_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}
public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}
public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}
public WaitHandle[] EventArray
{
get { return _eventArray; }
}
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;
}
public class Producer
{
public Producer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
// Producer.ThreadRun
public void ThreadRun()
{
int count = 0;
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
while (_queue.Count < 20)
{
_queue.Enqueue(r.Next(0,100));
_syncEvents.NewItemEvent.Set();
count++;
}
}
}
Console.WriteLine("Producer thread: produced {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
public class Consumer
{
public Consumer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
// Consumer.ThreadRun
public void ThreadRun()
{
int count = 0;
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock (((ICollection)_queue).SyncRoot)
{
int item = _queue.Dequeue();
}
count++;
}
Console.WriteLine("Consumer Thread: consumed {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
public class ThreadSyncSample
{
private static void ShowQueueContents(Queue<int> q)
{
lock (((ICollection)q).SyncRoot)
{
foreach (int item in q)
{
Console.Write("{0} ", item);
}
}
Console.WriteLine();
}
static void Main()
{
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();
Console.WriteLine("Configuring worker threads...");
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);
Console.WriteLine("Launching producer and consumer threads...");
producerThread.Start();
consumerThread.Start();
for (int i=0; i<4; i++)
{
Thread.Sleep(2500);
ShowQueueContents(queue);
}
Console.WriteLine("Signaling threads to terminate...");
syncEvents.ExitThreadEvent.Set();
producerThread.Join();
consumerThread.Join();
}
}
Sample Output
Configuring worker threads... Launching producer and consumer threads... 22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87 79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59 0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21 38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92 Signalling threads to terminate... Consumer Thread: consumed 1053771 items Producer thread: produced 1053791 items
See Also
Tasks
Monitor Synchronization Technology Sample
Wait Synchronization Technology Sample
Reference
Thread Synchronization (C# Programming Guide)
lock Statement (C# Reference)
AutoResetEvent
ManualResetEvent
Set
Join
WaitOne
WaitAll
Queue
ICollection
Start
Sleep
WaitHandle
EventWaitHandle