방법: 생산자와 소비자 스레드 동기화(C# 프로그래밍 가이드)
업데이트: 2007년 11월
다음 예제에서는 lock 키워드와 AutoResetEvent 및 ManualResetEvent 클래스를 사용하여 기본 스레드와 두 작업자 스레드 사이에 스레드 동기화를 수행하는 방법을 보여 줍니다. 자세한 내용은 lock 문(C# 참조)을 참조하십시오.
이 예제에서는 두 개의 보조(작업자) 스레드를 만듭니다. 스레드 하나는 요소를 생성하고 이 요소를 스레드로부터 안전하게 보호되지 않는 제네릭 큐에 저장합니다. 자세한 내용은 Queue<T>을 참조하십시오. 다른 스레드는 이 큐의 항목을 사용합니다. 또한 기본 스레드는 큐의 내용을 정기적으로 표시하여 세 가지 스레드가 큐에 액세스할 수 있도록 합니다. lock 키워드는 큐에 대한 액세스를 동기화하여 큐의 상태가 손상되지 않도록 하는 데 사용됩니다.
lock 키워드를 사용하여 큐에 동시에 액세스하지 못하도록 방지할 뿐만 아니라 두 가지 이벤트 개체를 통해 추가로 동기화를 제공합니다. 한 개체는 작업자 스레드를 종료하도록 신호를 보내는 데 사용되고, 다른 개체는 새 항목이 큐에 추가된 경우 생산자 스레드가 소비자 스레드에 신호를 보내기 위해 사용됩니다. 이러한 두 이벤트 개체는 SyncEvents라는 클래스에 캡슐화되어 있습니다. 이렇게 하면 소비자 및 생산자 스레드를 나타내는 개체에 이벤트를 쉽게 전달할 수 있습니다. SyncEvents 클래스는 다음과 같이 정의됩니다.
public class SyncEvents
{
public SyncEvents()
{
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);
_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}
public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}
public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}
public WaitHandle[] EventArray
{
get { return _eventArray; }
}
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;
}
AutoResetEvent 클래스는 "새 항목" 이벤트에 사용됩니다. 이렇게 하면 소비자 스레드가 이 이벤트에 응답할 때마다 이 이벤트가 자동으로 다시 설정되도록 할 수 있습니다. 또는 ManualResetEvent 클래스를 "종료" 이벤트에 사용할 수 있습니다. 이렇게 하면 이 이벤트가 신호를 받을 때 여러 스레드가 응답하도록 할 수 있습니다. AutoResetEvent를 대신 사용하는 경우 스레드가 하나만 이벤트에 응답하면 이벤트가 신호를 받지 않은 상태로 되돌아갑니다. 다른 스레드는 응답하지 않으므로 이 경우 종료에 실패합니다.
SyncEvents 클래스는 두 개의 이벤트를 만들고 이를 서로 다른 두 형식으로 저장합니다. EventWaitHandle은 AutoResetEvent와 ManualResetEvent 모두의 기본 클래스 형식이고 WaitHandle을 기반으로 한 배열에 있습니다. 소비자 스레드 설명에서 알 수 있듯이 이 배열은 소비자 스레드가 이벤트에 응답하는 데 필요합니다.
소비자 및 생산자 스레드는 Consumer 및 Producer라는 클래스로 표현됩니다. 두 클래스는 모두 ThreadRun이라는 메서드를 정의합니다. 이러한 메서드는 Main 메서드가 만드는 작업자 스레드의 진입점으로 사용됩니다.
Producer 클래스에서 정의된 ThreadRun 메서드는 다음과 비슷합니다.
// Producer.ThreadRun
public void ThreadRun()
{
int count = 0;
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
while (_queue.Count < 20)
{
_queue.Enqueue(r.Next(0,100));
_syncEvents.NewItemEvent.Set();
count++;
}
}
}
Console.WriteLine("Producer thread: produced {0} items", count);
}
이 메서드는 "스레드 종료" 이벤트가 신호를 받을 때까지 반복됩니다. 이 이벤트의 상태는 SyncEvents 클래스에서 정의된 ExitThreadEvent 속성을 사용하여 WaitOne 메서드로 테스트합니다. 이 경우 현재 스레드를 차단하지 않고 이벤트 상태가 확인됩니다. WaitOne에 사용되는 첫 번째 인수가 0이며 이는 메서드가 결과를 즉시 반환해야 함을 나타내기 때문입니다. WaitOne이 true를 반환하는 경우 해당 이벤트는 현재 신호를 받은 상태입니다. 이 경우 ThreadRun 메서드가 반환하는 결과로 인해 이 메서드를 실행하는 작업자 스레드가 종료됩니다.
"스레드 종료" 이벤트가 신호를 받을 때까지 Producer.ThreadStart 메서드는 큐에 20개의 항목을 유지합니다. 항목은 0에서 100 사이의 정수입니다. 소비자 스레드와 기본 스레드가 컬렉션에 동시에 액세스하지 못하도록 방지하기 위해 새 항목을 추가하기 전에 컬렉션을 잠가야 합니다. 이 작업은 lock 키워드를 사용하여 수행됩니다. lock에 전달되는 인수는 ICollection 인터페이스를 통해 노출되는 SyncRoot 필드입니다. 이 필드는 스레드 액세스를 통기화하기 위해 특별히 제공됩니다. lock 뒤에 나오는 코드 블록에 포함된 명령에는 컬렉션에 대한 단독 액세스 권한이 부여됩니다. 생산자가 큐에 추가하는 각각의 새 항목과 관련하여 "새 항목" 이벤트에 대한 Set 메서드가 호출됩니다. 이 메서드를 호출하면 새 항목을 처리하기 위해 일시 중단 상태에서 벗어나도록 소비자 스레드에 신호가 전달됩니다.
Consumer 개체도 ThreadRun이라는 메서드를 정의합니다. 생산자 버전의 ThreadRun과 마찬가지로 이 메서드는 Main 메서드가 만든 작업자 스레드를 통해 실행됩니다. 그러나 소비자 버전의 ThreadStart는 두 가지 이벤트에 응답해야 합니다. Consumer.ThreadRun 메서드는 다음과 비슷합니다.
// Consumer.ThreadRun
public void ThreadRun()
{
int count = 0;
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock (((ICollection)_queue).SyncRoot)
{
int item = _queue.Dequeue();
}
count++;
}
Console.WriteLine("Consumer Thread: consumed {0} items", count);
}
이 메서드는 제공된 배열의 대기 핸들에 신호가 전달될 때까지 WaitAny를 사용하여 소비자 스레드를 차단합니다. 이 경우 배열에는 두 개의 핸들이 있습니다. 하나는 작업자 스레드를 종료하기 위한 것이고 다른 하나는 새 항목이 컬렉션에 추가되었음을 나타내기 위한 것입니다. WaitAny는 신호를 받은 이벤트의 인덱스를 반환합니다. "새 항목" 이벤트는 배열의 첫 번째 이벤트이므로 인덱스가 0인 경우 새 항목을 나타냅니다. 이 경우 "스레드 종료" 이벤트를 나타내는 인덱스 1을 검사합니다. 이 작업은 이 메서드에서 계속 항목을 사용할지 여부를 확인하는 데 사용됩니다. "새 항목" 이벤트에 신호가 전달되면 lock을 사용하여 컬렉션에 단독으로 액세스하고 새 항목을 사용할 수 있습니다. 이 예제에서는 수천 개의 항목이 생성되고 사용되므로 사용되는 각 항목을 표시하지는 않습니다. 이 예제에서는 Main을 대신 사용하여 큐의 내용을 정기적으로 표시합니다.
Main 메서드는 먼저 해당 내용을 생성하고 사용할 큐와 앞에서 살펴본 SyncEvents의 인스턴스를 만듭니다.
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();
그런 다음 Main 메서드는 작업자 스레드에 사용할 Producer 및 Consumer 개체를 구성합니다. 그러나 이 단계에서 실제 작업자 스레드를 만들거나 시작하지는 않습니다.
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);
큐와 동기화 이벤트 개체가 Consumer 및 Producer 스레드에 모두 생성자 인수로 전달됩니다. 이렇게 하면 두 개체가 해당 작업을 수행하는 데 필요한 공유 리소스를 사용할 수 있습니다. 그런 다음 각 개체에 대한 ThreadRun 메서드를 인수로 사용하여 두 개의 새로운 Thread 개체를 만듭니다. 각 작업자 스레드를 시작하면 이 인수가 스레드에 대한 진입점으로 사용됩니다.
이제 Main 메서드가 다음과 같이 Start 메서드를 호출하여 두 작업자 스레드를 시작합니다.
producerThread.Start();
consumerThread.Start();
이 시점에서 두 개의 새로운 작업자 스레드가 작성되고 현재 Main 메서드를 실행하고 있는 기본 스레드와는 독립적으로 비동기 실행을 시작합니다. 사실 Main 메서드가 다음에 수행하는 작업은 Sleep 메서드를 호출하여 기본 스레드를 일시 중단하는 일입니다. 이 메서드는 지정된 밀리초 단위 시간동안 현재 실행 중인 스레드를 일시 중단합니다. 지정된 시간이 경과하면 Main이 다시 활성화되고 이때 큐의 내용이 표시됩니다. Main은 다음과 같이 이 작업을 네 번 반복합니다.
for (int i=0; i<4; i++)
{
Thread.Sleep(2500);
ShowQueueContents(queue);
}
마지막으로 Main은 "스레드 종료" 이벤트의 Set 메서드를 호출하여 작업자 스레드를 종료하도록 신호를 보낸 다음 각 작업자 스레드에 대해 Join 메서드를 호출하여 각 작업자 스레드가 이벤트에 응답하고 종료될 때까지 기본 스레드를 차단합니다.
스레드 동기화의 마지막 예로 ShowQueueContents 메서드를 하나 더 들 수 있습니다. 소비자 및 생산자 스레드와 마찬가지로 이 메서드는 lock을 사용하여 큐에 단독으로 액세스합니다. 그러나 이 경우 ShowQueueContents에서 컬렉션의 모든 항목을 열거하므로 단독 액세스가 매우 중요합니다. 컬렉션의 항목을 열거하려면 전체 컬렉션의 내용 간에 이동해야 하므로 비동기 작업으로 인해 데이터가 손상될 위험이 특히 높습니다.
ShowQueueContents는 Main에서 호출되므로 기본 스레드를 통해 실행됩니다. 즉, 이 메서드는 항목 큐에 대한 단독 액세스 권한을 획득할 때 생산자 및 소비자 스레드가 모두 큐에 액세스하지 못하도록 차단합니다. ShowQueueContents는 큐를 잠그고 해당 내용을 열거합니다.
private static void ShowQueueContents(Queue<int> q)
{
lock (((ICollection)q).SyncRoot)
{
foreach (int item in q)
{
Console.Write("{0} ", item);
}
}
Console.WriteLine();
}
다음은 완성된 예제입니다.
예제
using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
public class SyncEvents
{
public SyncEvents()
{
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);
_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}
public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}
public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}
public WaitHandle[] EventArray
{
get { return _eventArray; }
}
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;
}
public class Producer
{
public Producer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
// Producer.ThreadRun
public void ThreadRun()
{
int count = 0;
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
while (_queue.Count < 20)
{
_queue.Enqueue(r.Next(0,100));
_syncEvents.NewItemEvent.Set();
count++;
}
}
}
Console.WriteLine("Producer thread: produced {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
public class Consumer
{
public Consumer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
// Consumer.ThreadRun
public void ThreadRun()
{
int count = 0;
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock (((ICollection)_queue).SyncRoot)
{
int item = _queue.Dequeue();
}
count++;
}
Console.WriteLine("Consumer Thread: consumed {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
public class ThreadSyncSample
{
private static void ShowQueueContents(Queue<int> q)
{
lock (((ICollection)q).SyncRoot)
{
foreach (int item in q)
{
Console.Write("{0} ", item);
}
}
Console.WriteLine();
}
static void Main()
{
Queue<int> queue = new Queue<int>();
SyncEvents syncEvents = new SyncEvents();
Console.WriteLine("Configuring worker threads...");
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);
Console.WriteLine("Launching producer and consumer threads...");
producerThread.Start();
consumerThread.Start();
for (int i=0; i<4; i++)
{
Thread.Sleep(2500);
ShowQueueContents(queue);
}
Console.WriteLine("Signaling threads to terminate...");
syncEvents.ExitThreadEvent.Set();
producerThread.Join();
consumerThread.Join();
}
}
Configuring worker threads...
Launching producer and consumer threads...
22 92 64 70 13 59 9 2 43 52 91 98 50 96 46 22 40 94 24 87
79 54 5 39 21 29 77 77 1 68 69 81 4 75 43 70 87 72 59
0 69 98 54 92 16 84 61 30 45 50 17 86 16 59 20 73 43 21
38 46 84 59 11 87 77 5 53 65 7 16 66 26 79 74 26 37 56 92
Signalling threads to terminate...
Consumer Thread: consumed 1053771 items
Producer thread: produced 1053791 items