共用方式為


如何:在數據流區塊中指定平行處理原則的程度

本文件說明如何設定 ExecutionDataflowBlockOptions.MaxDegreeOfParallelism 屬性,讓執行數據流區塊一次處理多個訊息。 當您擁有執行長時間執行計算的數據流區塊,而且可受益於平行處理訊息時,這樣做會很有用。 這個範例會使用 System.Threading.Tasks.Dataflow.ActionBlock<TInput> 類別同時執行多個數據流作業;不過,您可以在 TPL 資料流連結庫提供的任何預先定義執行區塊類型中指定平行處理原則的最大程度,ActionBlock<TInput>System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>

備註

TPL 資料流連結庫 (System.Threading.Tasks.Dataflow 命名空間) 不會與 .NET 一起散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表選擇 [管理 NuGet 套件],然後在線搜尋 System.Threading.Tasks.Dataflow 套件。 或者,若要使用 .NET Core CLI安裝它,請執行 dotnet add package System.Threading.Tasks.Dataflow

範例

下列範例會執行兩個數據流計算,並列印每個計算所需的經過時間。 第一個計算指定的最大平行度為 1,這是預設值。 平行處理原則的最大程度上限為 1 會導致數據流區塊以序列方式處理訊息。 第二個計算與第一個計算類似,差別在於它指定的最大平行度為可用處理器的數目。 這可讓數據流區塊平行執行多個作業。

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to specify the maximum degree of parallelism
// when using dataflow.
class Program
{
   // Performs several computations by using dataflow and returns the elapsed
   // time required to perform the computations.
   static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
      int messageCount)
   {
      // Create an ActionBlock<int> that performs some work.
      var workerBlock = new ActionBlock<int>(
         // Simulate work by suspending the current thread.
         millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
         // Specify a maximum degree of parallelism.
         new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = maxDegreeOfParallelism
         });

      // Compute the time that it takes for several messages to
      // flow through the dataflow block.

      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      for (int i = 0; i < messageCount; i++)
      {
         workerBlock.Post(1000);
      }
      workerBlock.Complete();

      // Wait for all messages to propagate through the network.
      workerBlock.Completion.Wait();

      // Stop the timer and return the elapsed number of milliseconds.
      stopwatch.Stop();
      return stopwatch.Elapsed;
   }
   static void Main(string[] args)
   {
      int processorCount = Environment.ProcessorCount;
      int messageCount = processorCount;

      // Print the number of processors on this computer.
      Console.WriteLine($"Processor count = {processorCount}.");

      TimeSpan elapsed;

      // Perform two dataflow computations and print the elapsed
      // time required for each.

      // This call specifies a maximum degree of parallelism of 1.
      // This causes the dataflow block to process messages serially.
      elapsed = TimeDataflowComputations(1, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", 1, messageCount, (int)elapsed.TotalMilliseconds);

      // Perform the computations again. This time, specify the number of
      // processors as the maximum degree of parallelism. This causes
      // multiple messages to be processed in parallel.
      elapsed = TimeDataflowComputations(processorCount, messageCount);
      Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " +
         "elapsed time = {2}ms.", processorCount, messageCount, (int)elapsed.TotalMilliseconds);
   }
}

/* Sample output:
Processor count = 4.
Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
*/
Imports System.Diagnostics
Imports System.Threading
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to specify the maximum degree of parallelism 
' when using dataflow.
Friend Class Program
    ' Performs several computations by using dataflow and returns the elapsed
    ' time required to perform the computations.
    Private Shared Function TimeDataflowComputations(ByVal maxDegreeOfParallelism As Integer, ByVal messageCount As Integer) As TimeSpan
        ' Create an ActionBlock<int> that performs some work.
        Dim workerBlock = New ActionBlock(Of Integer)(Function(millisecondsTimeout) Pause(millisecondsTimeout), New ExecutionDataflowBlockOptions() With {.MaxDegreeOfParallelism = maxDegreeOfParallelism})
        ' Simulate work by suspending the current thread.
        ' Specify a maximum degree of parallelism.

        ' Compute the time that it takes for several messages to 
        ' flow through the dataflow block.

        Dim stopwatch As New Stopwatch()
        stopwatch.Start()

        For i As Integer = 0 To messageCount - 1
            workerBlock.Post(1000)
        Next i
        workerBlock.Complete()

        ' Wait for all messages to propagate through the network.
        workerBlock.Completion.Wait()

        ' Stop the timer and return the elapsed number of milliseconds.
        stopwatch.Stop()
        Return stopwatch.Elapsed
    End Function

    Private Shared Function Pause(ByVal obj As Object)
        Thread.Sleep(obj)
        Return Nothing
    End Function
    Shared Sub Main(ByVal args() As String)
        Dim processorCount As Integer = Environment.ProcessorCount
        Dim messageCount As Integer = processorCount

        ' Print the number of processors on this computer.
        Console.WriteLine("Processor count = {0}.", processorCount)

        Dim elapsed As TimeSpan

        ' Perform two dataflow computations and print the elapsed
        ' time required for each.

        ' This call specifies a maximum degree of parallelism of 1.
        ' This causes the dataflow block to process messages serially.
        elapsed = TimeDataflowComputations(1, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", 1, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))

        ' Perform the computations again. This time, specify the number of 
        ' processors as the maximum degree of parallelism. This causes
        ' multiple messages to be processed in parallel.
        elapsed = TimeDataflowComputations(processorCount, messageCount)
        Console.WriteLine("Degree of parallelism = {0}; message count = {1}; " & "elapsed time = {2}ms.", processorCount, messageCount, CInt(Fix(elapsed.TotalMilliseconds)))
    End Sub
End Class

' Sample output:
'Processor count = 4.
'Degree of parallelism = 1; message count = 4; elapsed time = 4032ms.
'Degree of parallelism = 4; message count = 4; elapsed time = 1001ms.
'

健全的程式設計

根據預設,每個預先定義的數據流區塊會依照接收訊息的順序來傳播訊息。 雖然當您指定大於 1 的最大平行處理原則程度時,會同時處理多個訊息,但它們仍會依接收的順序傳播。

因為 MaxDegreeOfParallelism 屬性代表平行處理原則的最大程度,因此數據流區塊的執行方式可能會比您指定的平行處理原則少一些。 數據流區塊可以使用較少程度的平行處理原則來符合其功能需求,或考慮缺少可用的系統資源。 數據流區塊絕不會選擇比您指定程度更高的平行性。

另請參閱