방법: BlockingCollection에서 개별적으로 항목 추가 및 가져오기
이 예제에서는 블로킹 및 비블로킹 방식으로 BlockingCollection<T>에서 항목을 추가 및 제거하는 방법을 보여 줍니다. BlockingCollection<T>에 대한 자세한 내용은 BlockingCollection 개요을 참조하십시오.
BlockingCollection<T>이 비어 있고 더 이상 요소가 추가되지 않을 때까지 열거하는 방법을 보여 주는 예제는 방법: ForEach를 사용하여 BlockingCollection의 항목 제거를 참조하십시오.
예제
이 첫 번째 예제에서는 항목을 가져올 때 컬렉션이 일시적으로 비어 있거나 항목을 추가할 때 최대 용량에 도달해 있는 경우 또는 지정된 제한 시간이 경과한 경우에 작업이 차단되도록 항목을 추가하고 가져오는 방법을 보여 줍니다. 최대 용량에 대한 블로킹은 생성자에서 지정된 최대 용량으로 BlockingCollection을 만든 경우에만 사용할 수 있습니다.
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Module SimpleBlocking
Class Program
Shared Sub Main()
' Increase or decrease this value as desired.
Dim itemsToAdd As Integer = 500
' Preserve all the display output for Adds and Takes
Console.SetBufferSize(80, (itemsToAdd * 2) + 3)
' A bounded collection. Increase, decrease, or remove the
' maximum capacity argument to see how it impacts behavior.
Dim numbers = New BlockingCollection(Of Integer)(50)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
Dim i As Integer = -1
While numbers.IsCompleted = False
Try
i = numbers.Take()
Catch ioe As InvalidOperationException
Console.WriteLine("Adding was completed!")
Exit While
End Try
Console.WriteLine("Take:{0} ", i)
' Simulate a slow consumer. This will cause
' collection to fill up fast and thus Adds wil block.
Thread.SpinWait(100000)
End While
Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
For i As Integer = 0 To itemsToAdd
numbers.Add(i)
Console.WriteLine("Add:{0} Count={1}", i, numbers.Count)
Next
'See documentation for this method.
numbers.CompleteAdding()
End Sub)
'Keep the console window open in debug mode.
Console.ReadLine()
End Sub
End Class
End Module
namespace BCBlockingAccess
{
using System;
using System.Collections.Concurrent;
class Program
{
static void Main(string[] args)
{
// Increase or decrease this value as desired.
int itemsToAdd = 500;
// Preserve all the display output for Adds and Takes
Console.SetBufferSize(80, (itemsToAdd * 2) + 3);
// A bounded collection. Increase, decrease, or remove the
// maximum capacity argument to see how it impacts behavior.
BlockingCollection<int> numbers = new BlockingCollection<int>(50);
// A simple blocking consumer with no cancellation.
Task.Factory.StartNew(() =>
{
int i = -1;
while (!numbers.IsCompleted)
{
try
{
i = numbers.Take();
}
catch (InvalidOperationException)
{
Console.WriteLine("Adding was compeleted!");
break;
}
Console.WriteLine("Take:{0} ", i);
// Simulate a slow consumer. This will cause
// collection to fill up fast and thus Adds wil block.
Thread.SpinWait(100000);
}
Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
});
// A simple blocking producer with no cancellation.
Task.Factory.StartNew(() =>
{
for (int i = 0; i < itemsToAdd; i++)
{
numbers.Add(i);
Console.WriteLine("Add:{0} Count={1}", i, numbers.Count);
}
// See documentation for this method.
numbers.CompleteAdding();
});
// Keep the console display open in debug mode.
Console.ReadLine();
}
}
}
이 두 번째 예제에서는 작업이 차단되지 않도록 항목을 추가하고 가져오는 방법을 보여 줍니다. 바인딩된 컬렉션에 항목이 전혀 없거나 이 컬렉션의 최대 용량에 도달한 경우 또는 제한 시간이 경과한 경우 TryAdd() 또는 TryTake() 작업에서는 false를 반환합니다. 이 메커니즘을 통해 스레드는 일시적으로 다른 유용한 작업을 수행한 후 나중에 새 항목 검색을 다시 시도하거나 이전에 추가하지 못한 항목을 추가하려고 할 수 있습니다. 예제 프로그램에서는 BlockingCollection<T>에 액세스할 때 취소를 구현하는 방법도 보여 줍니다.
Option Strict On
Option Explicit On
Imports System.Collections.Concurrent
Imports System.Threading
Imports System.Threading.Tasks
Module NonBlockingBC
Class NonBlockingAccess
Shared inputs As Integer
Shared Sub Main()
' The token source for issuing the cancelation request.
Dim cts As CancellationTokenSource = New CancellationTokenSource()
' A blocking collection that can hold no more than 100 items at a time.
Dim numberCollection As BlockingCollection(Of Integer) = New BlockingCollection(Of Integer)(100)
' Set console buffer to hold our prodigious output.
Console.SetBufferSize(80, 2000)
' The simplest UI thread ever invented.
Task.Factory.StartNew(Sub()
If Console.ReadKey.KeyChar() = "c"c Then
cts.Cancel()
End If
End Sub)
' Start one producer and one consumer.
Task.Factory.StartNew(Sub() NonBlockingConsumer(numberCollection, cts.Token))
Task.Factory.StartNew(Sub() NonBlockingProducer(numberCollection, cts.Token))
Console.WriteLine("Press the Enter key to exit.")
Console.ReadLine()
End Sub
Shared Sub NonBlockingConsumer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
' IsCompleted is equivalent to (IsAddingCompleted And Count = 0)
While bc.IsCompleted = False
Dim nextItem As Integer = 0
Try
If bc.TryTake(nextItem, 0, ct) Then
Console.WriteLine(" Take Blocked.")
Else
Console.WriteLine(" Take: {0}", nextItem)
End If
Catch ex As OperationCanceledException
Console.WriteLine("Taking canceled.")
Exit While
End Try
'Slow down consumer just a little to cause
' collection to fill up faster, and lead to "AddBlocked"
Thread.SpinWait(500000)
End While
Console.WriteLine(vbCrLf & "No more items to take. Press the Enter key to exit.")
End Sub
Shared Sub NonBlockingProducer(ByVal bc As BlockingCollection(Of Integer), ByVal ct As CancellationToken)
Dim itemToAdd As Integer = 0
Dim success As Boolean = False
Do While itemToAdd < inputs
'Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
Console.WriteLine("Add loop canceled.")
' Let other threads know we're done in case
' they aren't monitoring the cancellation token.
bc.CompleteAdding()
Exit Do
End Try
If success = True Then
Console.WriteLine(" Add:{0}", itemToAdd)
itemToAdd = itemToAdd + 1
Else
Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count)
' Don't increment nextItem. Try again on next iteration
' Do something else useful instead.
UpdateProgress(itemToAdd)
End If
Loop
End Sub
Shared Sub UpdateProgress(ByVal i As Integer)
Dim percent As Double = (CType(i, Double) / inputs) * 100
Console.WriteLine("Percent complete: {0}", percent)
End Sub
End Class
End Module
namespace BCNonBlockingWithCancellation
{
using System;
using System.Collections.Concurrent;
class ProgramWithCancellation
{
static int inputs = 2000;
static void Main(string[] args)
{
// The token source for issuing the cancelation request.
CancellationTokenSource cts = new CancellationTokenSource();
// A blocking collection that can hold no more than 100 items at a time.
BlockingCollection<int> numberCollection = new BlockingCollection<int>(100);
// Set console buffer to hold our prodigious output.
Console.SetBufferSize(80, 2000);
// The simplest UI thread ever invented.
Task.Factory.StartNew(() =>
{
if (Console.ReadKey().KeyChar == 'c')
cts.Cancel();
});
// Start one producer and one consumer.
Task.Factory.StartNew(() => NonBlockingConsumer(numberCollection, cts.Token));
Task.Factory.StartNew(() => NonBlockingProducer(numberCollection, cts.Token));
Console.WriteLine("Press the Enter key to exit.");
Console.ReadLine();
}
static void NonBlockingConsumer(BlockingCollection<int> bc, CancellationToken ct)
{
// IsCompleted == (IsAddingCompleted && Count == 0)
while (!bc.IsCompleted)
{
int nextItem = 0;
try
{
if (!bc.TryTake(out nextItem, 0, ct))
{
Console.WriteLine(" Take Blocked");
}
else
Console.WriteLine(" Take:{0}", nextItem);
}
catch (OperationCanceledException)
{
Console.WriteLine("Taking canceled.");
break;
}
// Slow down consumer just a little to cause
// collection to fill up faster, and lead to "AddBlocked"
Thread.SpinWait(500000);
}
Console.WriteLine("\r\nNo more items to take. Press the Enter key to exit.");
}
static void NonBlockingProducer(BlockingCollection<int> bc, CancellationToken ct)
{
int itemToAdd = 0;
bool success = false;
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
// A shorter timeout causes more failures.
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
Console.WriteLine("Add loop canceled.");
// Let other threads know we're done in case
// they aren't monitoring the cancellation token.
bc.CompleteAdding();
break;
}
if (success)
{
Console.WriteLine(" Add:{0}", itemToAdd);
itemToAdd++;
}
else
{
Console.Write(" AddBlocked:{0} Count = {1}", itemToAdd.ToString(), bc.Count);
// Don't increment nextItem. Try again on next iteration.
//Do something else useful instead.
UpdateProgress(itemToAdd);
}
} while (itemToAdd < inputs);
// No lock required here because only one producer.
bc.CompleteAdding();
}
static void UpdateProgress(int i)
{
double percent = ((double)i / inputs) * 100;
Console.WriteLine("Percent complete: {0}", percent);
}
}
}