如何:指定数据流块中的并行度
本文档介绍如何设置 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 属性使执行数据流块一次处理多条消息。 当数据流块需要执行长时间运行的计算并且可从并行处理消息中获益时,这种做法很有用。 此示例使用 System.Threading.Tasks.Dataflow.ActionBlock<TInput> 类并发执行多个数据流操作;但是,您可以对 TPL 数据流库提供的任何预定义的执行块类型(ActionBlock<TInput>、System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> 和 System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>)指定最大并行度。
注意
TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow
包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow
。
示例
下面的示例执行两个数据流计算并输出每个计算所需的运行时间。 第一个计算指定最大并行度为 1,这是默认值。 最大并行度 1 会使数据流块按顺序处理消息。 第二个计算与第一个类似,但指定的最大并行度与可用处理器的数量相等。 这使数据流块能够并行执行多个操作。
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to specify the maximum degree of parallelism
// when using dataflow.
class Program
{
// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
int messageCount)
{
// Create an ActionBlock<int> that performs some work.
var workerBlock = new ActionBlock<int>(
// Simulate work by suspending the current thread.
millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
// Specify a maximum degree of parallelism.
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
});
// Compute the time that it takes for several messages to
// flow through the dataflow block.
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
for (int i = 0; i < messageCount; i++)
{
workerBlock.Post(1000);
}
workerBlock.Complete();
// Wait for all messages to propagate through the network.
workerBlock.Completion.Wait();
// Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop();
return stopwatch.Elapsed;
}
static void Main(string[] args)
{
int processorCount = Environment.ProcessorCount;
int messageCount = processorCount;
// Print the number of processors on this computer.
Console.WriteLine("Processor count = {0}.", processorCount);
TimeSpan elapsed;
// Perform two dataflow computations and print the elapsed
// time required for each.
// This call specifies a maximum degree of parallelism of 1.
// This causes the dataflow block to process messages serially.
elapsed = TimeDataflowComputations(1, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);
// Perform the computations again. This time, specify the number of
// processors as the maximum degree of parallelism. This causes
// multiple messages to be processed in parallel.
elapsed = TimeDataflowComputations(processorCount, messageCount);
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
"elapsed time = {2}ms.", processorCount, messageCount, (int)elapsed.TotalMilliseconds);
}
}
/* Sample output:
Processor count = 4.
Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
*/
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to specify the maximum degree of parallelism
' when using dataflow.
Friend Class Program
' Performs several computations by using dataflow and returns the elapsed
' time required to perform the computations.
Private Shared Function TimeDataflowComputations(ByVal maxDegreeOfParallelism As Integer, ByVal messageCount As Integer) As TimeSpan
' Create an ActionBlock<int> that performs some work.
Dim workerBlock = New ActionBlock(Of Integer)(Function(millisecondsTimeout) Pause(millisecondsTimeout), New ExecutionDataflowBlockOptions() With {.MaxDegreeOfParallelism = maxDegreeOfParallelism})
' Simulate work by suspending the current thread.
' Specify a maximum degree of parallelism.
' Compute the time that it takes for several messages to
' flow through the dataflow block.
Dim stopwatch As New Stopwatch()
stopwatch.Start()
For i As Integer = 0 To messageCount - 1
workerBlock.Post(1000)
Next i
workerBlock.Complete()
' Wait for all messages to propagate through the network.
workerBlock.Completion.Wait()
' Stop the timer and return the elapsed number of milliseconds.
stopwatch.Stop()
Return stopwatch.Elapsed
End Function
Private Shared Function Pause(ByVal obj As Object)
Thread.Sleep(obj)
Return Nothing
End Function
Shared Sub Main(ByVal args() As String)
Dim processorCount As Integer = Environment.ProcessorCount
Dim messageCount As Integer = processorCount
' Print the number of processors on this computer.
Console.WriteLine("Processor count = {0}.", processorCount)
Dim elapsed As TimeSpan
' Perform two dataflow computations and print the elapsed
' time required for each.
' This call specifies a maximum degree of parallelism of 1.
' This causes the dataflow block to process messages serially.
elapsed = TimeDataflowComputations(1, messageCount)
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", 1, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))
' Perform the computations again. This time, specify the number of
' processors as the maximum degree of parallelism. This causes
' multiple messages to be processed in parallel.
elapsed = TimeDataflowComputations(processorCount, messageCount)
Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", processorCount, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))
End Sub
End Class
' Sample output:
'Processor count = 4.
'Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
'Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
'
可靠编程
默认情况下,每个预定义的数据流块会按照接收消息的顺序将消息传播出去。 虽然在指定的最大并行度大于 1 时会同时处理多条消息,但这些消息仍然按被接收的顺序进行传播。
由于 MaxDegreeOfParallelism 属性表示最大并行度,因此数据流块执行时的并行度可能小于指定的值。 为了满足功能需求或需要考虑可用系统资源不足的问题时,数据流块可以使用较小的并行度。 数据流块选择的并行度从不会大于您指定的值。