演练:创建数据流管道
尽管可以使用 DataflowBlock.Receive、DataflowBlock.ReceiveAsync 和 DataflowBlock.TryReceive 方法从源块接收消息,但也可以连接消息块来形成一个数据流管道。 数据流管道是一系列组件或“数据流块”,每个组件或数据流块执行一个有助于实现更大目标的特定任务。 数据流管道中的每个数据流块会在收到来自另一数据流块的消息时执行工作。 这就好比是汽车制造装配线。 每辆汽车通过装配线时,一站组装车架,下一站则安装引擎,以此类推。 因为装配线可以同时装配多辆汽车,所以比一次装配整辆车拥有更高的产出。
本文档演示了一个数据流管道,用于从网站上下载书籍《The Iliad of Homer》并搜索文本以将各个单词与反转第一个单词字符的单词相匹配。 本文档中数据流管道的形成包括以下步骤:
创建参与管道的数据流块。
连接每个数据流块与管道中的下一个块。 每个块将管道中前一个块的输出作为输入接收。
对每个数据流块,创建一个延续任务,该延续任务在上一个块完成后将下一个块的状态设置为已完成状态。
将数据发布到管道的开头。
将管道的开头标记为已完成。
等待管道完成所有工作。
先决条件
开始本演练之前,请阅读数据流。
创建控制台应用程序
在 Visual Studio 中,创建 Visual C# 或 Visual Basic“控制台应用程序”项目。 安装 System.Threading.Tasks.Dataflow NuGet 包。
注意
TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow
包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow
。
将以下代码添加到项目中以创建基本应用程序。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
创建数据流块
将以下代码添加到 Main
方法以创建参与管道的数据流块。 下表总结了管道的每个成员的角色。
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
成员 | 类型 | 描述 |
---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | 从 Web 下载该书的文本。 |
createWordList |
TransformBlock<TInput,TOutput> | 将该书的文本分成单词的数组。 |
filterWordList |
TransformBlock<TInput,TOutput> | 删除短词和单词数组中的重复项。 |
findReversedWords |
TransformManyBlock<TInput,TOutput> | 查找经过筛选的单词数组集合中所有将字母反转后仍在单词数组中出现的单词。 |
printReversedWords |
ActionBlock<TInput> | 向控制台显示单词及对应的倒序词。 |
虽然可以将本示例中数据流管道的多个步骤合并为一个步骤,不过本示例阐释了组合多个独立数据流任务来执行较大任务的概念。 示例通过使用 TransformBlock<TInput,TOutput> 使管道的每个成员能对其输入数据执行操作并将结果发送到管道中的下一步骤。 管道的 findReversedWords
成员是一个 TransformManyBlock<TInput,TOutput> 对象,因为该成员会为每个输入生成多个独立输出。 管道的结尾 printReversedWords
是一个 ActionBlock<TInput> 对象,因为它会对其输入执行一个操作,但不产生结果。
形成管道
添加以下代码将每个块与管道中的下一个块连接。
当您调用 LinkTo 方法将源数据流块连接到目标数据流块时,源数据流块会在数据可用时将数据传播到目标块。 如果你还提供 DataflowLinkOptions,并将 PropagateCompletion 设置为 true,则在管道中成功或未成功完成一个块都将导致管道中下一个块的完成。
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
将数据发布到管道
添加以下代码,以将《The Iliad of Homer》一书的 URL 发布到数据流管道的开头。
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
此示例使用 DataflowBlock.Post 将数据同步发送到管道的开头。 在必须将数据异步发送到数据流节点时,请使用 DataflowBlock.SendAsync 方法。
完成管道活动
添加以下代码将管道的开头标记为已完成。 管道的开头在处理了所有缓冲的消息后传播其完成。
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
此示例通过要处理的数据流管道发送一个 URL。 如果要通过管道发送多个输入,请在提交了所有输入后调用 IDataflowBlock.Complete 方法。 如果您的应用程序没有表示数据不再可用或应用程序不必等待管道完成的定义完善的点,则可以忽略此步骤。
等待管道完成
添加以下代码以等待管道完成。 管道的结尾完成时完成整个操作。
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
可以同时等待任一线程或多个线程的数据流完成。
完整示例
下面的示例显示此演练的完整代码。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
后续步骤
此示例发送一个通过数据流管道处理的 URL。 如果要通过管道发送多个输入值,可以将并行的形式引入应用程序,这与零件在汽车厂中移动的方式类似。 当管道的第一个成员将其结果发送给第二个成员时,它可以在第二个成员处理第一个结果时并行处理另一个项。
通过使用数据流管道实现的并行称为粗粒度并行,因为它通常由几个较大的任务组成。 此外,你也可以在数据流管道中对短时间运行的较小任务使用粒度较细的并行。 在本示例中,管道的 findReversedWords
成员使用 PLINQ 并行处理工作列表中的多个项。 在粗粒度的管道中使用细粒度并行可以提高总吞吐量。
另外,还可以将数据流块连接到多个目标块,以创建“数据流网络”。 LinkTo 方法采用一个 Predicate<T> 对象,该对象定义了目标块是否根据其值来接受每个消息。 大多数充当源的数据流块类型按目标块连接的顺序向所有已连接的目标块提供消息,直到其中一个块接受此消息。 通过使用此筛选机制,您可以创建已连接数据流块的系统,指示某些数据通过一条路径,其他数据通过另一条路径。 有关使用筛选来创建数据流网络的示例,请参阅演练:在 Windows 窗体应用程序中使用数据流。