逐步解說:建立資料流程管線
雖然您可以使用 DataflowBlock.Receive、DataflowBlock.ReceiveAsync 和 DataflowBlock.TryReceive 方法從來源區塊接收訊息,但是也可以將訊息區連接起來,形成「資料流程管線」。 資料流程管線是一系列的元件,或稱為「資料流程區塊」,各個元件分別執行一項特定工作,以便共同完成整體目標。 資料流程管線中的每個資料流程區塊會在收到來自其他資料流程區塊的訊息時,開始執行工作。 以汽車製造的裝配線做比喻。 當每輛汽車通過裝配線時,某一站會組裝車架,下一站會安裝引擎,以此類推。 由於裝配線能夠同時組裝多輛車,因此裝配線的生產量會優於一次將一輛車從頭到尾組裝完成的生產量。
本文示範資料流程管線,該管線會從網站下載《The Iliad of Homer》一書,並搜尋拼字順序相反的單字,例如 doom 與 mood。 本文件中資料流程管線是由下列步驟形成:
建立參與管線的資料流程區塊。
將每個資料流程區塊連接到管線中的下一個區塊。 每個區塊都會收到管線中前一個區塊的輸出做為輸入。
對每個資料流程區塊建立接續工作,在前一個區塊完成之後將下一個區塊設定為已完成狀態。
將資料公佈至管線的開頭。
將管線的開頭標示為已完成。
等候管線完成所有工作。
必要條件
在開始進行這個逐步解說之前,請先閱讀資料流程。
建立主控台應用程式
在 Visual Studio 中,建立 Visual C# 或 Visual Basic 主控台應用程式專案。 安裝 System.Threading.Tasks.Dataflow NuGet 套件。
注意
TPL 資料流程程式庫 (System.Threading.Tasks.Dataflow 命名空間) 並未隨 .NET 散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表中選擇 [管理 NuGet 套件],並於線上搜尋 System.Threading.Tasks.Dataflow
套件。 除此之外也可使用 .Net Core CLI (執行 dotnet add package System.Threading.Tasks.Dataflow
) 加以安裝。
將下列程式碼加入至您的專案,建立基本的應用程式。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
建立資料流程區塊
將下列程式碼加入至 Main
方法,建立參與管線的資料流程區塊。 下表摘要說明管線中每個成員的角色。
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
member | 類型 | 描述 |
---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | 從網站下載書籍的文字。 |
createWordList |
TransformBlock<TInput,TOutput> | 將書籍的文字分成文字陣列。 |
filterWordList |
TransformBlock<TInput,TOutput> | 移除文字陣列中的短字和重複字。 |
findReversedWords |
TransformManyBlock<TInput,TOutput> | 在篩選過的文字陣列集合中,尋找反向排列項目同樣出現在文字陣列中的所有文字。 |
printReversedWords |
ActionBlock<TInput> | 將文字和對應的反向文字顯示到主控台。 |
雖然您可以將這個範例中資料流程管線的多個步驟合併成單一步驟,但是範例的目的在於說明撰寫多個獨立的資料流程工作來執行整體工作的概念。 這個範例會使用 TransformBlock<TInput,TOutput> 讓管線的每個成員對其輸入資料執行作業,並且將結果傳送到管線中的下一個步驟。 管線的 findReversedWords
成員是 TransformManyBlock<TInput,TOutput> 物件,因為它會針對每個輸入產生多個獨立的輸出。 管線的尾端 printReversedWords
是 ActionBlock<TInput> 物件,因為它會對其輸入執行動作,但不會產生結果。
構成管線
加入下列程式碼,將管線中的每個區塊連接到下一個區塊。
當您呼叫 LinkTo 方法將來源資料流程區塊連接到目標資料流程區塊時,來源資料流程區塊會在有可用資料時,將資料傳播至目標區塊。 如果您也提供了 DataflowLinkOptions 並將 PropagateCompletion 設為 true,不論管線中某個區塊是否順利完成,都將使得管線中的下一個區域完成。
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
將資料公佈至管線
加入下列程式碼,將《The Iliad of Homer》這本書的 URL 張貼至資料流程管線的開頭。
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
這個範例會使用 DataflowBlock.Post 以同步方式將資料傳送至管線的開頭。 如果您必須以非同步方式將資料傳送至資料流程節點,則使用 DataflowBlock.SendAsync 方法。
完成管線活動
加入下列程式碼,將管線的開頭標示為已完成。 管線的開頭會在處理所有緩衝的訊息後,繼續完成其工作。
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
這個範例會藉由資料流程管線傳送要處理的 URL。 如果您透過管線傳送多個輸入,請在送出所有輸入之後呼叫 IDataflowBlock.Complete 方法。 如果您的應用程式並未明確定義資料何時失效,或是應用程式不需要等候管線完成,則可以省略這個步驟。
等候管線完成
加入下列程式碼等候管線完成。 管線尾端完成時,整體作業即告完成。
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
您可以從任何一個執行緒或同時從多個執行緒等候資料流程完成。
完整範例
下列範例將示範本逐步解說的完整程式碼。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
後續步驟
這個範例會藉由資料流程管線傳送要處理的 URL。 如果您透過管線傳送多個輸入值,則可以在應用程式中引入某種形式的平行處理原則,這種形式類似組件汽車工廠內零件移動的流程。 當管線中的第一個成員將其結果傳送給第二個成員時,可以在第二個成員處理第一個結果時平行處理另一個項目。
使用資料流程管線達成的平行處理原則稱為「粗略平行處理原則」(Coarse-grained Parallelism),因為它通常包含數量較少的大型工作。 您也可以在資料流程管線中使用「精細平行處理原則」(Fine-grained Parallelism) 來處理一些較小且執行時間較短的工作。 在這個範例中,管線的 findReversedWords
成員會使用 PLINQ 來平行處理工作清單中的多個項目。 在粗略管線中使用精細平行處理原則可以改善整體生產量。
您也可以將來源資料流程區塊連接到多個目標區塊,藉此建立「資料流程網路」。 LinkTo 方法的多載版本採用 Predicate<T> 物件定義目標區塊是否依據其值接受每個訊息。 大部分做為來源的資料流程區塊類型都會依照連接的順序,對所有連接的目標區塊提供訊息,直到其中一個區塊接受該訊息為止。 使用這個篩選機制就可以建立連接資料流程區塊的系統,透過不同的路徑導引不同的資料。 如需使用篩選來建立資料流程網路的範例,請參閱逐步解說:在 Windows Forms 應用程式中使用資料流程。