尽管可以使用 DataflowBlock.Receive、DataflowBlock.ReceiveAsync 和 DataflowBlock.TryReceive 方法从源块接收消息,但也可以连接消息块来形成一个数据流管道。 数据流管道是一系列组件或“数据流块”,每个组件或数据流块执行一个有助于实现更大目标的特定任务。 数据流管道中的每个数据流块会在收到来自另一数据流块的消息时执行工作。 这就好比是汽车制造装配线。 每辆汽车通过装配线时,一站组装车架,下一站则安装引擎,以此类推。 因为装配线可以同时装配多辆汽车,所以比一次装配整辆车拥有更高的产出。
本文档演示了一个数据流管道,用于从网站上下载书籍《The Iliad of Homer》并搜索文本以将各个单词与反转第一个单词字符的单词相匹配。 本文档中数据流管道的形成包括以下步骤:
连接每个数据流块与管道中的下一个块。 每个块将管道中前一个块的输出作为输入接收。
在 Visual Studio 中,创建 Visual C# 或 Visual Basic“控制台应用程序”项目。 安装 System.Threading.Tasks.Dataflow NuGet 包。
TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow
包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
static void Main()
// Create the members of the pipeline.
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
// Connect the dataflow blocks to form a pipeline.
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
// Mark the head of the pipeline as complete.
// Wait for the last block in the pipeline to process all messages.
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
' Create the members of the pipeline.
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
' Connect the dataflow blocks to form a pipeline.
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
' Mark the head of the pipeline as complete.
' Wait for the last block in the pipeline to process all messages.
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
此示例发送一个通过数据流管道处理的 URL。 如果要通过管道发送多个输入值,可以将并行的形式引入应用程序,这与零件在汽车厂中移动的方式类似。 当管道的第一个成员将其结果发送给第二个成员时,它可以在第二个成员处理第一个结果时并行处理另一个项。
通过使用数据流管道实现的并行称为粗粒度并行,因为它通常由几个较大的任务组成。 此外,你也可以在数据流管道中对短时间运行的较小任务使用粒度较细的并行。 在本示例中,管道的 findReversedWords
成员使用 PLINQ 并行处理工作列表中的多个项。 在粗粒度的管道中使用细粒度并行可以提高总吞吐量。
另外,还可以将数据流块连接到多个目标块,以创建“数据流网络”。 LinkTo 方法采用一个 Predicate<T> 对象,该对象定义了目标块是否根据其值来接受每个消息。 大多数充当源的数据流块类型按目标块连接的顺序向所有已连接的目标块提供消息,直到其中一个块接受此消息。 通过使用此筛选机制,您可以创建已连接数据流块的系统,指示某些数据通过一条路径,其他数据通过另一条路径。 有关使用筛选来创建数据流网络的示例,请参阅演练:在 Windows 窗体应用程序中使用数据流。