Procedura dettagliata: Creazione di una pipeline del flusso di dati
Sebbene sia possibile usare i metodi DataflowBlock.Receive, DataflowBlock.ReceiveAsync e DataflowBlock.TryReceive per ricevere messaggi da blocchi di origine, è anche possibile connettere blocchi di messaggi per formare una pipeline del flusso di dati. Una pipeline del flusso di dati è costituita da una serie di componenti o blocchi di flussi di dati, ognuno dei quali esegue un'attività specifica che contribuisce a un obiettivo più grande. In ogni blocco di flussi di dati di una pipeline del flusso di dati viene eseguito un lavoro quando si riceve un messaggio da un altro blocco di flussi di dati. Un'analogia a questo è data da una catena di montaggio per la produzione di automobili. Man mano che ciascun veicolo passa attraverso la catena di montaggio, in una postazione viene assemblato il telaio, nella successiva viene installato il motore e così via. Grazie alla catena di montaggio in cui è possibile eseguire il montaggio di più veicoli contemporaneamente, si ottiene una maggiore produzione rispetto al montaggio completo dei veicoli uno alla volta.
Questo documento illustra una pipeline del flusso dati che scarica il libro L'Iliade di Omero da un sito Web ed esegue ricerche nel testo per trovare corrispondenza tra parole specifiche e le stesse parole con l'ordine dei caratteri rovesciato. La formazione della pipeline del flusso di dati in questo documento consiste nei passaggi seguenti:
Creare i blocchi di flussi di dati che fanno parte della pipeline.
Connettere ogni blocco di flussi di dati al blocco successivo nella pipeline. L'output del blocco precedente nella pipeline viene ricevuto da ogni blocco come input.
Per ogni blocco di flussi di dati, creare un'attività di continuazione mediante la quale il blocco successivo viene impostato sullo stato completato al termine del blocco precedente.
Inserire i dati nell'intestazione della pipeline.
Contrassegnare l'intestazione della pipeline come completata.
Attendere il completamento di tutto il lavoro da parte della pipeline.
Prerequisiti
Prima di iniziare questa procedura dettagliata, leggere Flusso di dati.
Creazione di un'applicazione console
In Visual Studio creare un progetto Applicazione console di Visual C# o Visual Basic. Installare il pacchetto System.Threading.Tasks.Dataflow NuGet.
Nota
La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow
. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow
.
Aggiungere il seguente codice al progetto per creare l'applicazione di base.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
static void Main()
{
}
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
End Sub
End Module
Creazione dei blocchi di flussi di dati
Aggiungere il seguente codice al metodo Main
per creare i blocchi di flussi di dati che fanno parte della pipeline. Nella tabella seguente viene riepilogato il ruolo di ciascun membro della pipeline.
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
Membro | Type | Descrizione |
---|---|---|
downloadString |
TransformBlock<TInput,TOutput> | Scarica il testo del libro dal Web. |
createWordList |
TransformBlock<TInput,TOutput> | Suddivide il testo del libro in una matrice di parole. |
filterWordList |
TransformBlock<TInput,TOutput> | Rimuove le parole brevi e i duplicati dalla matrice di parole. |
findReversedWords |
TransformManyBlock<TInput,TOutput> | Cerca tutte le parole nella raccolta filtrata della matrice di parole i cui contrari sono presenti anche nella matrice di parole. |
printReversedWords |
ActionBlock<TInput> | Visualizza le parole e i contrari corrispondenti nella console. |
Sebbene sia possibile combinare più passaggi nella pipeline del flusso di dati in questo esempio in un unico passaggio, nell'esempio viene illustrato il concetto di composizione di più attività del flusso di dati indipendenti per eseguire un'attività più grande. Nell'esempio viene utilizzato l'oggetto TransformBlock<TInput,TOutput> per consentire a ogni membro della pipeline di eseguire un'operazione sui relativi dati di input e inviare i risultati al passaggio successivo nella pipeline. Il membro findReversedWords
della pipeline è un oggetto TransformManyBlock<TInput,TOutput> poiché tramite esso vengono generati più output indipendenti per ogni input. La parte finale della pipeline, printReversedWords
, è un oggetto ActionBlock<TInput> poiché tramite esso viene eseguita un'azione sul relativo input e non viene generato un risultato.
Formazione della pipeline
Aggiungere il codice seguente per connettere ogni blocco al successivo nella pipeline.
Quando si chiama il metodo LinkTo per connettere un blocco di origine del flusso di dati a un blocco di destinazione del flusso di dati, i dati nel blocco di origine del flusso di dati vengono propagati nel blocco di destinazione quando diventano disponibili. Se si specifica anche DataflowLinkOptions con PropagateCompletion impostato su true, l'esito positivo o negativo di un blocco nella pipeline causerà il completamento del blocco successivo nella pipeline.
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
Inserimento dei dati nella pipeline
Aggiungere il codice seguente per inserire l'URL del libro L'Iliade di Omero nell'intestazione della pipeline del flusso di dati.
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
In questo esempio viene usato il metodo DataflowBlock.Post in modo sincrono per inviare i dati all'intestazione della pipeline. Utilizzare il metodo DataflowBlock.SendAsync quando è necessario inviare in modo asincrono i dati a un nodo del flusso di dati.
Completamento dell'attività della pipeline
Aggiungere il codice seguente per contrassegnare l'intestazione della pipeline come completata. L'intestazione della pipeline propaga il completamento dopo l'elaborazione di tutti i messaggi memorizzati nel buffer.
// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()
In questo esempio viene inviato un URL tramite la pipeline del flusso di dati da elaborare. Se si invia più di un input attraverso una pipeline, chiamare il metodo IDataflowBlock.Complete dopo l'invio di tutti gli input. È possibile omettere questo passaggio se nell'applicazione non vi è alcun punto ben definito in cui i dati non sono più disponibili o non è necessaria l'attesa del completamento della pipeline da parte dell'applicazione.
Attesa del completamento della pipeline
Aggiungere il codice seguente per attendere il completamento della pipeline. L'operazione globale viene completata al termine della parte finale della pipeline.
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
È possibile attendere il completamento del flusso di dati da qualsiasi thread o da più thread contemporaneamente.
Esempio completo
Nell'esempio riportato di seguito viene illustrato il codice completo per questa procedura guidata.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
static void Main()
{
//
// Create the members of the pipeline.
//
// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
Console.WriteLine("Downloading '{0}'...", uri);
return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});
// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
Console.WriteLine("Creating word list...");
// Remove common punctuation by replacing all non-letter characters
// with a space character.
char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
text = new string(tokens);
// Separate the text into an array of words.
return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});
// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
Console.WriteLine("Filtering word list...");
return words
.Where(word => word.Length > 3)
.Distinct()
.ToArray();
});
// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
Console.WriteLine("Finding reversed words...");
var wordsSet = new HashSet<string>(words);
return from word in words.AsParallel()
let reverse = new string(word.Reverse().ToArray())
where word != reverse && wordsSet.Contains(reverse)
select word;
});
// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
Console.WriteLine("Found reversed words {0}/{1}",
reversedWord, new string(reversedWord.Reverse().ToArray()));
});
//
// Connect the dataflow blocks to form a pipeline.
//
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
// Mark the head of the pipeline as complete.
downloadString.Complete();
// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
}
}
/* Sample output:
Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
Creating word list...
Filtering word list...
Finding reversed words...
Found reversed words doom/mood
Found reversed words draw/ward
Found reversed words aera/area
Found reversed words seat/taes
Found reversed words live/evil
Found reversed words port/trop
Found reversed words sleek/keels
Found reversed words area/aera
Found reversed words tops/spot
Found reversed words evil/live
Found reversed words mood/doom
Found reversed words speed/deeps
Found reversed words moor/room
Found reversed words trop/port
Found reversed words spot/tops
Found reversed words spots/stops
Found reversed words stops/spots
Found reversed words reed/deer
Found reversed words keels/sleek
Found reversed words deeps/speed
Found reversed words deer/reed
Found reversed words taes/seat
Found reversed words room/moor
Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web
' and finds all reversed words that appear in that book.
Module DataflowReversedWords
Sub Main()
'
' Create the members of the pipeline.
'
' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
Async Function(uri)
Console.WriteLine("Downloading '{0}'...", uri)
Return Await New HttpClient().GetStringAsync(uri)
End Function)
' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
Function(text)
Console.WriteLine("Creating word list...")
' Remove common punctuation by replacing all non-letter characters
' with a space character.
Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
text = New String(tokens)
' Separate the text into an array of words.
Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
End Function)
' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
Function(words)
Console.WriteLine("Filtering word list...")
Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
End Function)
' Finds all words in the specified collection whose reverse also
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
Function(words)
Dim wordsSet = New HashSet(Of String)(words)
Return From word In words.AsParallel()
Let reverse = New String(word.Reverse().ToArray())
Where word <> reverse AndAlso wordsSet.Contains(reverse)
Select word
End Function)
' Prints the provided reversed words to the console.
Dim printReversedWords = New ActionBlock(Of String)(
Sub(reversedWord)
Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
End Sub)
'
' Connect the dataflow blocks to form a pipeline.
'
Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}
downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")
' Mark the head of the pipeline as complete.
downloadString.Complete()
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()
End Sub
End Module
' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw
Passaggi successivi
In questo esempio viene inviato un URL da elaborare tramite la pipeline del flusso di dati. Se si invia più di un valore di input tramite una pipeline, è possibile introdurre un form di parallelismo nell'applicazione che rappresenta il numero di parti che possono essere spostate in una fabbrica di automobili. Quando tramite il primo membro della pipeline viene inviato il risultato al secondo membro, è possibile elaborare un altro elemento in parallelo mentre tramite il secondo membro viene elaborato il primo risultato.
Il parallelismo raggiunto attraverso l'uso di pipeline di flussi di dati è noto come parallelismo con granularità grossolana poiché consiste in genere in attività più o meno grandi. È anche possibile usare un parallelismo con granularità più fine di attività più piccole a esecuzione breve in una pipeline del flusso di dati. In questo esempio, nel membro findReversedWords
della pipeline viene usato PLINQ per elaborare in parallelo più elementi nell'elenco di lavoro. L'utilizzo di parallelismo con granularità fine in una pipeline con granularità grossolana può migliorare la velocità effettiva globale.
È anche possibile connettere un blocco di origine del flusso di dati a più blocchi di destinazione per creare una rete del flusso di dati. La versione sottoposta a overload del metodo LinkTo accetta un oggetto Predicate<T> mediante il quale viene definito se il blocco di destinazione accetta ogni messaggio in base al relativo valore. La maggior parte dei tipi di blocchi di flussi di dati utilizzati come origini offrono messaggi a tutti i blocchi di destinazione connessi, nell'ordine in cui sono stati collegati, finché uno dei blocchi non accetta il messaggio. Tramite questo meccanismo di filtro, è possibile creare sistemi di blocchi di flussi di dati connessi in cui determinati dati vengono indirizzati tramite un percorso mentre altri tramite un altro percorso. Per un esempio che usa il filtro per creare una rete del flusso di dati, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Forms.