BlockingCollection の概要
BlockingCollection<T> は、次のような機能を提供するスレッド セーフなコレクション クラスです。
プロデューサー/コンシューマー パターンの実装。
複数のスレッドからの項目の同時追加と同時取得。
最大容量 (オプション)。
コレクションが空またはいっぱいになったときにブロックする挿入操作と削除操作。
ブロックせずに実行されるか、指定された時間が経過するまでブロックする、挿入および削除の "試行" 操作。
IProducerConsumerCollection<T> を実装する任意のコレクション型のカプセル化。
キャンセル トークンを使用したキャンセル。
foreach (Visual Basic では For Each) を使用した 2 種類の列挙。
読み取り専用の列挙。
列挙された項目を削除する列挙。
境界とブロッキングのサポート
BlockingCollection<T> は境界とブロッキングをサポートします。 境界は、コレクションの最大容量を設定できることを意味します。 境界を使用すると、メモリ内のコレクションの最大サイズを制御し、producer スレッドが consumer スレッドよりも先に進行しすぎるのを防ぐことができます。これは、特定のシナリオで重要になります。
コレクションには、複数のスレッドやタスクが同時に項目を追加できます。コレクションが指定された最大容量に達すると、producer スレッドは項目が削除されるまでブロックします。 複数の cosumer が同時に項目を削除できます。コレクションが空になった場合、consumer スレッドは、producer が項目を追加するまでブロックします。 producer スレッドでは、それ以上項目が追加されないことを示すために、CompleteAdding を呼び出すことができます。 consumer では、IsCompleted プロパティを監視して、コレクションが空になったときや、それ以上の項目は追加されないことになったときを把握できます。 次の例は、容量の上限が 100 に設定された単純な BlockingCollection を示しています。 いくつかの外部条件が true である間、producer タスクはコレクションに項目を追加し、CompleteAdding を呼び出します。 consumer タスクは、IsCompleted プロパティが true になるまで項目を取得します。
' A bounded collection. It can hold no more
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
While dataItems.IsCompleted = False
Dim dataItem As Data = Nothing
Try
dataItem = dataItems.Take()
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (dataItem IsNot Nothing) Then
Process(dataItem)
End If
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
While moreItemsToAdd = True
Dim item As Data = GetData()
' Blocks if numbers.Count = dataItems.BoundedCapacity
dataItems.Add(item)
End While
' Let consumer know we are done.
dataItems.CompleteAdding()
End Sub)
// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// A simple blocking consumer with no cancellation.
Task.Factory.StartNew(() =>
{
while (!dataItems.IsCompleted)
{
Data data = null;
// Blocks if number.Count == 0
// IOE means that Take() was called on a completed collection.
// Some other thread can call CompleteAdding after we pass the
// IsCompleted check but before we call Take.
// In this example, we can simply catch the exception since the
// loop will break on the next iteration.
try
{
data = dataItems.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
Process(data);
}
}
Console.WriteLine("\r\nNo more items to take.");
});
// A simple blocking producer with no cancellation.
Task.Factory.StartNew(() =>
{
while (moreItemsToAdd)
{
Data data = GetData();
// Blocks if numbers.Count == dataItems.BoundedCapacity
dataItems.Add(data);
}
// Let consumer know we are done.
dataItems.CompleteAdding();
});
コード例全体については、「方法: BlockingCollection の項目を個別に追加および取得する」を参照してください。
時間制限付きのブロッキング操作
境界のあるコレクションにおける時間制限付きの TryAdd 操作および TryTake 操作では、メソッドは項目を追加または取得しようと試みます。 項目を使用できる場合、その項目は参照渡しで渡された変数に格納され、メソッドは true を返します。 指定されたタイムアウト期間を経過しても項目が取得されない場合、メソッドは false を返します。 その後、再びコレクションへのアクセスを試みる前に、他の必要な処理をスレッドで自由に実行できます。 時間制限付きのブロッキング アクセスの例については、「方法: BlockingCollection の項目を個別に追加および取得する」の 2 番目の例を参照してください。
追加操作と取得操作の取り消し
追加操作と取得操作は、通常、ループ内で実行されます。 TryAdd メソッドまたは TryTake メソッドに CancellationToken を渡し、各イテレーションでトークンの IsCancellationRequested プロパティの値を確認するようにすると、ループを取り消すことができます。 値が true である場合は、キャンセル要求に応答するかどうかを決定できます。応答するには、リソースをクリーンアップし、ループを終了します。 次の例は、キャンセル トークンを受け取る TryAdd のオーバーロードと、それを使用するコードを示しています。
Do While moreItems = True
' Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
bc.CompleteAdding()
Exit Do
End Try
Loop
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
bc.CompleteAdding();
break;
}
//...
} while (moreItems == true);
キャンセルのサポートを追加する方法の例については、「方法: BlockingCollection の項目を個別に追加および取得する」の 2 番目の例を参照してください。
コレクション型の指定
BlockingCollection<T> の作成時には、容量の上限だけでなく、使用するコレクションの型も指定できます。 たとえば、先入れ先出し方式 (FIFO) の動作には ConcurrentConcurrentQueue を、後入れ先出し方式 (LIFO) の動作には ConcurrentStack<T> を指定できます。 IProducerConsumerCollection<T> インターフェイスを実装する任意のコレクション クラスを使用できます。 BlockingCollection<T> の既定のコレクション型は ConcurrentQueue<T> です。 次のコード例では、容量が 1,000 で ConcurrentBag を使用する、文字列の BlockingCollection<T> を作成する方法を示します。
Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );
詳細については、「方法: 境界ブロッキング機能をコレクション クラスに追加する」を参照してください。
IEnumerable のサポート
BlockingCollection<T> は、GetConsumingEnumerable メソッドを提供します。これにより、consumer は foreach (Visual Basic では For Each) を使用して、コレクションが完成するまで (コレクションが空になり、それ以上項目が追加されなくなるまで) 項目を削除できます。 詳細については、「方法: ForEach を使用して BlockingCollection 内の項目を削除する」を参照してください。
多数の BlockingCollection を 1 つとして使用
consumer が複数のコレクションから同時に項目を取得する必要のあるシナリオでは、BlockingCollection<T> の配列を作成し、TakeFromAny や AddToAny などの静的メソッドを使用できます。これらのメソッドでは、配列内の任意のコレクションを対象に追加または取得の操作を実行できます。 いずれかのコレクションがブロックしている場合、メソッドはすぐに別のコレクションを試します。これは、操作を実行できるコレクションが見つかるまで続行されます。 詳細については、「方法: パイプラインでブロッキング コレクションの配列を使用する」を参照してください。