BlockingCollection 概述
BlockingCollection<T> 是一个线程安全集合类,可提供以下功能:
实现制造者-使用者模式。
通过多线程并发添加和获取项。
可选最大容量。
集合为空或已满时通过插入和移除操作进行阻塞。
插入和移除“尝试”操作不发生阻塞,或在指定时间段内发生阻塞。
封装实现 IProducerConsumerCollection<T> 的任何集合类型
使用取消标记执行取消操作。
支持使用
foreach
(在 Visual Basic 中,使用For Each
)的两种枚举:只读枚举。
在枚举项时将项移除的枚举。
限制和阻塞支持
BlockingCollection<T> 支持限制和阻塞。 限制意味着可以设置集合的最大容量。 限制在某些情况中很重要,因为它使你能够控制内存中的集合的最大大小,并可阻止制造线程移动到离使用线程前方太远的位置。
多个线程或任务可同时向集合添加项,如果集合达到其指定最大容量,则制造线程将发生阻塞,直到移除集合中的某个项。 多个使用者可以同时移除项,如果集合变空,则使用线程将发生阻塞,直到制造者添加某个项。 制造线程可调用 CompleteAdding 来指示不再添加项。 使用者将监视 IsCompleted 属性以了解集合何时为空且不再添加项。 下面的示例展示了容量上限为 100 的简单 BlockingCollection。 只要满足某些外部条件为 true,制造者任务就会向集合添加项,然后调用 CompleteAdding。 使用者任务获取项,直到 IsCompleted 属性为 true。
// A bounded collection. It can hold no more
// than 100 items at once.
BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);
// A simple blocking consumer with no cancellation.
Task.Run(() =>
{
while (!dataItems.IsCompleted)
{
Data data = null;
// Blocks if dataItems.Count == 0.
// IOE means that Take() was called on a completed collection.
// Some other thread can call CompleteAdding after we pass the
// IsCompleted check but before we call Take.
// In this example, we can simply catch the exception since the
// loop will break on the next iteration.
try
{
data = dataItems.Take();
}
catch (InvalidOperationException) { }
if (data != null)
{
Process(data);
}
}
Console.WriteLine("\r\nNo more items to take.");
});
// A simple blocking producer with no cancellation.
Task.Run(() =>
{
while (moreItemsToAdd)
{
Data data = GetData();
// Blocks if numbers.Count == dataItems.BoundedCapacity
dataItems.Add(data);
}
// Let consumer know we are done.
dataItems.CompleteAdding();
});
' A bounded collection. It can hold no more
' than 100 items at once.
Dim dataItems = New BlockingCollection(Of Data)(100)
' A simple blocking consumer with no cancellation.
Task.Factory.StartNew(Sub()
While dataItems.IsCompleted = False
Dim dataItem As Data = Nothing
Try
dataItem = dataItems.Take()
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (dataItem IsNot Nothing) Then
Process(dataItem)
End If
End While
Console.WriteLine(vbCrLf & "No more items to take.")
End Sub)
' A simple blocking producer with no cancellation.
Task.Factory.StartNew(Sub()
While moreItemsToAdd = True
Dim item As Data = GetData()
' Blocks if dataItems.Count = dataItems.BoundedCapacity.
dataItems.Add(item)
End While
' Let consumer know we are done.
dataItems.CompleteAdding()
End Sub)
有关完整的示例,请参阅 如何:在 BlockingCollection 中逐个添加和取出项。
计时阻塞操作
在针对有限集合的计时阻塞 TryAdd 和 TryTake 操作中,此方法将尝试添加或取出某个项。 如果项可用,项会被置于通过引用传入的变量中,然后方法返回 true。 如果在指定的超时期限过后未检索到任何项,方法返回 false。 相应线程可以任意执行一些其他有用的工作,然后再重新尝试访问该集合。 有关计时阻塞访问的示例,请参阅如何:在 BlockingCollection 中逐个添加和取出项中的第二个示例。
取消添加和取出操作
添加和取出操作通常会在一个循环内执行。 可以通过以下方法来取消循环:向 TryAdd 或 TryTake 方法传入 CancellationToken,然后在每次迭代时检查该标记的 IsCancellationRequested 属性的值。 如果值为 true,由你自行决定是否通过清理所有资源并退出循环来响应取消请求。 下面的示例演示获取取消标记和使用该标记的代码的 TryAdd 重载:
do
{
// Cancellation causes OCE. We know how to handle it.
try
{
success = bc.TryAdd(itemToAdd, 2, ct);
}
catch (OperationCanceledException)
{
bc.CompleteAdding();
break;
}
//...
} while (moreItems == true);
Do While moreItems = True
' Cancellation causes OCE. We know how to handle it.
Try
success = bc.TryAdd(itemToAdd, 2, ct)
Catch ex As OperationCanceledException
bc.CompleteAdding()
Exit Do
End Try
Loop
有关如何添加取消支持的示例,请参阅如何:在 BlockingCollection 中逐个添加和取出项中的第二个示例。
指定集合类型
创建 BlockingCollection<T> 时,不仅可以指定上限容量,而且可以指定要使用的集合类型。 例如,可为先进先出 (FIFO) 行为指定 ConcurrentQueue<T>,也可为后进先出 (LIFO) 行为指定 ConcurrentStack<T>。 可使用实现 IProducerConsumerCollection<T> 接口的任何集合类。 BlockingCollection<T> 的默认集合类型为 ConcurrentQueue<T>。 下面的代码示例演示如何创建字符串的 BlockingCollection<T>,其容量为 1000 并使用 ConcurrentBag<T>:
Dim bc = New BlockingCollection(Of String)(New ConcurrentBag(Of String()), 1000)
BlockingCollection<string> bc = new BlockingCollection<string>(new ConcurrentBag<string>(), 1000 );
有关详细信息,请参阅如何:向集合添加限制和阻塞功能。
IEnumerable 支持
BlockingCollection<T> 提供 GetConsumingEnumerable 方法,该方法允许使用者使用 foreach
(在 Visual Basic 中为 For Each
)删除项直到完成集合,也就是说,集合为空且不再添加项。 有关详细信息,请参阅如何:使用 ForEach 移除 BlockingCollection 中的项。
将多个 BlockingCollection 作为整体使用
在使用者需要同时取出多个集合中的项的情况下,可以创建 BlockingCollection<T> 的数组并使用静态方法,如 TakeFromAny 和 AddToAny 方法,这两个方法可以在该数组的任意集合中添加或取出项。 如果一个集合发生阻塞,此方法会立即尝试其他集合,直到找到能够执行该操作的集合。 有关详细信息,请参阅如何:在管道中使用阻塞集合的数组。