방법: 컬렉션에 한계 지정 및 차단 기능 추가
이 예제에서는 클래스에서 System.Collections.Concurrent.IProducerConsumerCollection<T> 인터페이스를 구현한 다음, 클래스 인스턴스를 System.Collections.Concurrent.BlockingCollection<T>의 내부 스토리지 메커니즘으로 사용하여 사용자 지정 컬렉션 클래스에 한계 지정 및 차단 기능을 추가하는 방법을 보여 줍니다. 한계 지정 및 차단에 대한 자세한 내용은 BlockingCollection 개요를 참조하십시오.
예시
사용자 지정 컬렉션 클래스는 우선 순위 수준이 System.Collections.Concurrent.ConcurrentQueue<T>개 개체의 배열로 표시되는 기본 우선 순위 큐입니다. 각 큐 내에서 추가로 정렬이 수행되지는 않습니다.
클라이언트 코드에서 다음 세 가지 작업이 시작됩니다. 첫 번째 작업은 실행 중 언제든지 취소할 수 있도록 키보드 스트로크를 폴링합니다. 두 번째 작업은 공급자 스레드입니다. 이 작업에서는 새 항목을 차단 컬렉션에 추가하고 임의 값에 기반하여 각 항목에 우선 순위를 부여합니다. 세 번째 작업은 컬렉션에서 제공되는 대로 항목을 제거합니다.
이러한 스레드 중 하나가 다른 스레드보다 빠르게 실행되도록 함으로써 애플리케이션의 동작을 조정할 수 있습니다. 공급자가 더 빠르게 실행되는 경우 차단 컬렉션에 포함된 항목이 생성자에 지정한 항목 개수에 이미 도달되었으면 한계 지정 기능으로 인해 항목이 차단 컬렉션에 추가되지 않습니다. 소비자가 더 빠르게 실행되는 경우 차단 기능으로 인해 새 항목이 추가될 때까지 소비자가 기다리게 됩니다.
namespace ProdConsumerCS
{
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// Implementation of a priority queue that has bounding and blocking functionality.
public class SimplePriorityQueue<TPriority, TValue> : IProducerConsumerCollection<KeyValuePair<int, TValue>>
{
// Each internal queue in the array represents a priority level.
// All elements in a given array share the same priority.
private ConcurrentQueue<KeyValuePair<int, TValue>>[] _queues = null;
// The number of queues we store internally.
private int priorityCount = 0;
private int m_count = 0;
public SimplePriorityQueue(int priCount)
{
this.priorityCount = priCount;
_queues = new ConcurrentQueue<KeyValuePair<int, TValue>>[priorityCount];
for (int i = 0; i < priorityCount; i++)
_queues[i] = new ConcurrentQueue<KeyValuePair<int, TValue>>();
}
// IProducerConsumerCollection members
public bool TryAdd(KeyValuePair<int, TValue> item)
{
_queues[item.Key].Enqueue(item);
Interlocked.Increment(ref m_count);
return true;
}
public bool TryTake(out KeyValuePair<int, TValue> item)
{
bool success = false;
// Loop through the queues in priority order
// looking for an item to dequeue.
for (int i = 0; i < priorityCount; i++)
{
// Lock the internal data so that the Dequeue
// operation and the updating of m_count are atomic.
lock (_queues)
{
success = _queues[i].TryDequeue(out item);
if (success)
{
Interlocked.Decrement(ref m_count);
return true;
}
}
}
// If we get here, we found nothing.
// Assign the out parameter to its default value and return false.
item = new KeyValuePair<int, TValue>(0, default(TValue));
return false;
}
public int Count
{
get { return m_count; }
}
// Required for ICollection
void ICollection.CopyTo(Array array, int index)
{
CopyTo(array as KeyValuePair<int, TValue>[], index);
}
// CopyTo is problematic in a producer-consumer.
// The destination array might be shorter or longer than what
// we get from ToArray due to adds or takes after the destination array was allocated.
// Therefore, all we try to do here is fill up destination with as much
// data as we have without running off the end.
public void CopyTo(KeyValuePair<int, TValue>[] destination, int destStartingIndex)
{
if (destination == null) throw new ArgumentNullException();
if (destStartingIndex < 0) throw new ArgumentOutOfRangeException();
int remaining = destination.Length;
KeyValuePair<int, TValue>[] temp = this.ToArray();
for (int i = 0; i < destination.Length && i < temp.Length; i++)
destination[i] = temp[i];
}
public KeyValuePair<int, TValue>[] ToArray()
{
KeyValuePair<int, TValue>[] result;
lock (_queues)
{
result = new KeyValuePair<int, TValue>[this.Count];
int index = 0;
foreach (var q in _queues)
{
if (q.Count > 0)
{
q.CopyTo(result, index);
index += q.Count;
}
}
return result;
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public IEnumerator<KeyValuePair<int, TValue>> GetEnumerator()
{
for (int i = 0; i < priorityCount; i++)
{
foreach (var item in _queues[i])
yield return item;
}
}
public bool IsSynchronized
{
get
{
throw new NotSupportedException();
}
}
public object SyncRoot
{
get { throw new NotSupportedException(); }
}
}
public class TestBlockingCollection
{
static void Main()
{
int priorityCount = 7;
SimplePriorityQueue<int, int> queue = new SimplePriorityQueue<int, int>(priorityCount);
var bc = new BlockingCollection<KeyValuePair<int, int>>(queue, 50);
CancellationTokenSource cts = new CancellationTokenSource();
Task.Run(() =>
{
if (Console.ReadKey(true).KeyChar == 'c')
cts.Cancel();
});
// Create a Task array so that we can Wait on it
// and catch any exceptions, including user cancellation.
Task[] tasks = new Task[2];
// Create a producer thread. You can change the code to
// make the wait time a bit slower than the consumer
// thread to demonstrate the blocking capability.
tasks[0] = Task.Run(() =>
{
// We randomize the wait time, and use that value
// to determine the priority level (Key) of the item.
Random r = new Random();
int itemsToAdd = 40;
int count = 0;
while (!cts.Token.IsCancellationRequested && itemsToAdd-- > 0)
{
int waitTime = r.Next(2000);
int priority = waitTime % priorityCount;
var item = new KeyValuePair<int, int>(priority, count++);
bc.Add(item);
Console.WriteLine("added pri {0}, data={1}", item.Key, item.Value);
}
Console.WriteLine("Producer is done adding.");
bc.CompleteAdding();
},
cts.Token);
//Give the producer a chance to add some items.
Thread.SpinWait(1000000);
// Create a consumer thread. The wait time is
// a bit slower than the producer thread to demonstrate
// the bounding capability at the high end. Change this value to see
// the consumer run faster to demonstrate the blocking functionality
// at the low end.
tasks[1] = Task.Run(() =>
{
while (!bc.IsCompleted && !cts.Token.IsCancellationRequested)
{
Random r = new Random();
int waitTime = r.Next(2000);
Thread.SpinWait(waitTime * 70);
// KeyValuePair is a value type. Initialize to avoid compile error in if(success)
KeyValuePair<int, int> item = new KeyValuePair<int, int>();
bool success = false;
success = bc.TryTake(out item);
if (success)
{
// Do something useful with the data.
Console.WriteLine("removed Pri = {0} data = {1} collCount= {2}", item.Key, item.Value, bc.Count);
}
else
{
Console.WriteLine("No items to retrieve. count = {0}", bc.Count);
}
}
Console.WriteLine("Exited consumer loop");
},
cts.Token);
try {
Task.WaitAll(tasks, cts.Token);
}
catch (OperationCanceledException e) {
if (e.CancellationToken == cts.Token)
Console.WriteLine("Operation was canceled by user. Press any key to exit");
}
catch (AggregateException ae) {
foreach (var v in ae.InnerExceptions)
Console.WriteLine(v.Message);
}
finally {
cts.Dispose();
}
Console.ReadKey(true);
}
}
}
기본적으로 System.Collections.Concurrent.BlockingCollection<T>의 스토리지는 System.Collections.Concurrent.ConcurrentQueue<T>입니다.
참고 항목
.NET