方法: ForEach を使用して BlockingCollection 内の項目を削除する
Take メソッドおよび TryTake メソッドを使用して BlockingCollection<T> からアイテムを取得するだけでなく、foreach (Visual Basic では For Each) を使用して、追加が完了してコレクションが空になるまで、アイテムを削除することもできます。 これは、列挙の変更または列挙の消費と呼ばれます。この列挙子は、通常の foreach (For Each) ループとは異なり、アイテムを削除することでソース コレクションを変更するためです。
使用例
foreach (For Each) ループを使用して BlockingCollection<T> 内のすべてのアイテムを削除する方法を次の例に示します。
Option Strict On
Option Explicit On
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module EnumerateBC
Class Program
' Limit the collection size to 2000 items
' at any given time. Set itemsToProduce to >500
' to hit the limit.
Const upperLimit As Integer = 1000
' Adjust this number to see how it impacts
' the producing-consuming pattern.
Const itemsToProduce As Integer = 100
Shared collection As BlockingCollection(Of Long) = New BlockingCollection(Of Long)(upperLimit)
' Variables for diagnostic output only.
Shared sw As New Stopwatch()
Shared totalAdditions As Integer = 0
' Counter for synchronizing producers.
Shared producersStillRunning As Integer = 2
Shared Sub Main()
' Start the stopwatch.
sw.Start()
' Queue the Producer threads.
Dim task1 = Task.Factory.StartNew(Sub() RunProducer("A", 0))
Dim task2 = Task.Factory.StartNew(Sub() RunProducer("B", itemsToProduce))
' Store in an array for use with ContinueWhenAll
Dim producers() As Task = {task1, task2}
' Create a cleanup task that will call CompleteAdding after
' all producers are done adding items.
Dim cleanup As Task = Task.Factory.ContinueWhenAll(producers, Sub(p) collection.CompleteAdding())
' Queue the Consumer thread. Put this call
' before Parallel.Invoke to begin consuming as soon as
' the producers add items.
Task.Factory.StartNew(Sub() RunConsumer())
' Keep the console window open while the
' consumer thread completes its output.
Console.ReadKey()
End Sub
Shared Sub RunProducer(ByVal ID As String, ByVal start As Integer)
Dim additions As Integer = 0
For i As Integer = start To start + itemsToProduce - 1
' The data that is added to the collection.
Dim ticks As Long = sw.ElapsedTicks
'Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i)
' Don't try to add item after CompleteAdding
' has been called.
If collection.IsAddingCompleted = False Then
collection.Add(ticks)
End If
' Counter for demonstration purposes only.
additions = additions + 1
' Uncomment this line to
' slow down the producer threads without sleeping.
Thread.SpinWait(100000)
Next
Interlocked.Add(totalAdditions, additions)
Console.WriteLine("{0} is done adding: {1} items", ID, additions)
End Sub
Shared Sub RunConsumer()
' GetConsumingEnumerable returns the enumerator for the
' underlying collection.
Dim subtractions As Integer = 0
For Each item In collection.GetConsumingEnumerable
subtractions = subtractions + 1
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions, collection.Count)
Next
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count())
sw.Stop()
Console.WriteLine("Press any key to exit.")
End Sub
End Class
End Module
namespace EnumerateBlockingCollection
{
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
class Program
{
// Limit the collection size to 2000 items
// at any given time. Set itemsToProduce to >500
// to hit the limit.
const int upperLimit = 1000;
// Adjust this number to see how it impacts
// the producing-consuming pattern.
const int itemsToProduce = 100;
static BlockingCollection<long> collection = new BlockingCollection<long>(upperLimit);
// Variables for diagnostic output only.
static Stopwatch sw = new Stopwatch();
static int totalAdditions = 0;
// Counter for synchronizing producers.
static int producersStillRunning = 2;
static void Main(string[] args)
{
// Start the stopwatch.
sw.Start();
// Queue the Producer threads. Store in an array
// for use with ContinueWhenAll
Task[] producers = new Task[2];
producers[0] = Task.Factory.StartNew(() => RunProducer("A", 0));
producers[1] = Task.Factory.StartNew(() => RunProducer("B", itemsToProduce));
// Create a cleanup task that will call CompleteAdding after
// all producers are done adding items.
Task cleanup = Task.Factory.ContinueWhenAll(producers, (p) => collection.CompleteAdding());
// Queue the Consumer thread. Put this call
// before Parallel.Invoke to begin consuming as soon as
// the producers add items.
Task.Factory.StartNew(() => RunConsumer());
// Keep the console window open while the
// consumer thread completes its output.
Console.ReadKey();
}
static void RunProducer(string ID, int start)
{
int additions = 0;
for (int i = start; i < start + itemsToProduce; i++)
{
// The data that is added to the collection.
long ticks = sw.ElapsedTicks;
// Display additions and subtractions.
Console.WriteLine("{0} adding tick value {1}. item# {2}", ID, ticks, i);
if(!collection.IsAddingCompleted)
collection.Add(ticks);
// Counter for demonstration purposes only.
additions++;
// Uncomment this line to
// slow down the producer threads ing.
Thread.SpinWait(100000);
}
Interlocked.Add(ref totalAdditions, additions);
Console.WriteLine("{0} is done adding: {1} items", ID, additions);
}
static void RunConsumer()
{
// GetConsumingEnumerable returns the enumerator for the
// underlying collection.
int subtractions = 0;
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine("Consuming tick value {0} : item# {1} : current count = {2}",
item.ToString("D18"), subtractions++, collection.Count);
}
Console.WriteLine("Total added: {0} Total consumed: {1} Current count: {2} ",
totalAdditions, subtractions, collection.Count());
sw.Stop();
Console.WriteLine("Press any key to exit");
}
}
}
この例では、consumer スレッドの BlockingCollection<T>.GetConsumingEnumerable メソッドで foreach ループを使用します。これにより、各アイテムは、列挙されるときにコレクションから削除されます。 System.Collections.Concurrent.BlockingCollection<T> では常に、コレクションに含まれるアイテムの最大数が制限されます。 この方法でコレクションを列挙すると、使用できるアイテムがない場合、またはコレクションが空の場合に、consumer スレッドがブロックされます。 この例では、producer スレッドによるアイテムの追加が、アイテムの使用よりも迅速に行われるため、ブロックは問題にはなりません。
producer スレッドによって追加されたときの順序と同じ順序でアイテムが列挙されるという保証はありません。
コレクションを変更せずに列挙するには、GetConsumingEnumerable メソッドを使用せずに foreach (For Each) を使用します。 ただし、この種の列挙が表しているのは、ある特定の時点のコレクションのスナップショットであることを理解する必要があります。 ループの実行中に他のスレッドがアイテムを同時に追加または削除した場合は、ループがコレクションの実際の状態を表していないことがあります。