Condividi tramite


Procedura dettagliata: Creazione di una pipeline del flusso di dati

Sebbene sia possibile usare i metodi DataflowBlock.Receive, DataflowBlock.ReceiveAsync e DataflowBlock.TryReceive per ricevere messaggi da blocchi di origine, è anche possibile connettere blocchi di messaggi per formare una pipeline del flusso di dati. Una pipeline del flusso di dati è costituita da una serie di componenti o blocchi di flussi di dati, ognuno dei quali esegue un'attività specifica che contribuisce a un obiettivo più grande. In ogni blocco di flussi di dati di una pipeline del flusso di dati viene eseguito un lavoro quando si riceve un messaggio da un altro blocco di flussi di dati. Un'analogia a questo è data da una catena di montaggio per la produzione di automobili. Man mano che ciascun veicolo passa attraverso la catena di montaggio, in una postazione viene assemblato il telaio, nella successiva viene installato il motore e così via. Grazie alla catena di montaggio in cui è possibile eseguire il montaggio di più veicoli contemporaneamente, si ottiene una maggiore produzione rispetto al montaggio completo dei veicoli uno alla volta.

Questo documento illustra una pipeline del flusso dati che scarica il libro L'Iliade di Omero da un sito Web ed esegue ricerche nel testo per trovare corrispondenza tra parole specifiche e le stesse parole con l'ordine dei caratteri rovesciato. La formazione della pipeline del flusso di dati in questo documento consiste nei passaggi seguenti:

  1. Creare i blocchi di flussi di dati che fanno parte della pipeline.

  2. Connettere ogni blocco di flussi di dati al blocco successivo nella pipeline. L'output del blocco precedente nella pipeline viene ricevuto da ogni blocco come input.

  3. Per ogni blocco di flussi di dati, creare un'attività di continuazione mediante la quale il blocco successivo viene impostato sullo stato completato al termine del blocco precedente.

  4. Inserire i dati nell'intestazione della pipeline.

  5. Contrassegnare l'intestazione della pipeline come completata.

  6. Attendere il completamento di tutto il lavoro da parte della pipeline.

Prerequisiti

Prima di iniziare questa procedura dettagliata, leggere Flusso di dati.

Creazione di un'applicazione console

In Visual Studio creare un progetto Applicazione console di Visual C# o Visual Basic. Installare il pacchetto System.Threading.Tasks.Dataflow NuGet.

Nota

La libreria del flusso di dati TPL (spazio dei nomi System.Threading.Tasks.Dataflow) non viene distribuita con .NET. Per installare lo spazio dei nomi System.Threading.Tasks.Dataflow in Visual Studio, aprire il progetto in Visual Studio, scegliere Gestisci pacchetti NuGet dal menu Progetto ed eseguire una ricerca online del pacchetto System.Threading.Tasks.Dataflow. In alternativa, per installarlo usando l'interfaccia della riga di comando di .NET Core, eseguire dotnet add package System.Threading.Tasks.Dataflow.

Aggiungere il seguente codice al progetto per creare l'applicazione di base.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Creazione dei blocchi di flussi di dati

Aggiungere il seguente codice al metodo Main per creare i blocchi di flussi di dati che fanno parte della pipeline. Nella tabella seguente viene riepilogato il ruolo di ciascun membro della pipeline.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Membro Type Descrizione
downloadString TransformBlock<TInput,TOutput> Scarica il testo del libro dal Web.
createWordList TransformBlock<TInput,TOutput> Suddivide il testo del libro in una matrice di parole.
filterWordList TransformBlock<TInput,TOutput> Rimuove le parole brevi e i duplicati dalla matrice di parole.
findReversedWords TransformManyBlock<TInput,TOutput> Cerca tutte le parole nella raccolta filtrata della matrice di parole i cui contrari sono presenti anche nella matrice di parole.
printReversedWords ActionBlock<TInput> Visualizza le parole e i contrari corrispondenti nella console.

Sebbene sia possibile combinare più passaggi nella pipeline del flusso di dati in questo esempio in un unico passaggio, nell'esempio viene illustrato il concetto di composizione di più attività del flusso di dati indipendenti per eseguire un'attività più grande. Nell'esempio viene utilizzato l'oggetto TransformBlock<TInput,TOutput> per consentire a ogni membro della pipeline di eseguire un'operazione sui relativi dati di input e inviare i risultati al passaggio successivo nella pipeline. Il membro findReversedWords della pipeline è un oggetto TransformManyBlock<TInput,TOutput> poiché tramite esso vengono generati più output indipendenti per ogni input. La parte finale della pipeline, printReversedWords, è un oggetto ActionBlock<TInput> poiché tramite esso viene eseguita un'azione sul relativo input e non viene generato un risultato.

Formazione della pipeline

Aggiungere il codice seguente per connettere ogni blocco al successivo nella pipeline.

Quando si chiama il metodo LinkTo per connettere un blocco di origine del flusso di dati a un blocco di destinazione del flusso di dati, i dati nel blocco di origine del flusso di dati vengono propagati nel blocco di destinazione quando diventano disponibili. Se si specifica anche DataflowLinkOptions con PropagateCompletion impostato su true, l'esito positivo o negativo di un blocco nella pipeline causerà il completamento del blocco successivo nella pipeline.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Inserimento dei dati nella pipeline

Aggiungere il codice seguente per inserire l'URL del libro L'Iliade di Omero nell'intestazione della pipeline del flusso di dati.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

In questo esempio viene usato il metodo DataflowBlock.Post in modo sincrono per inviare i dati all'intestazione della pipeline. Utilizzare il metodo DataflowBlock.SendAsync quando è necessario inviare in modo asincrono i dati a un nodo del flusso di dati.

Completamento dell'attività della pipeline

Aggiungere il codice seguente per contrassegnare l'intestazione della pipeline come completata. L'intestazione della pipeline propaga il completamento dopo l'elaborazione di tutti i messaggi memorizzati nel buffer.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

In questo esempio viene inviato un URL tramite la pipeline del flusso di dati da elaborare. Se si invia più di un input attraverso una pipeline, chiamare il metodo IDataflowBlock.Complete dopo l'invio di tutti gli input. È possibile omettere questo passaggio se nell'applicazione non vi è alcun punto ben definito in cui i dati non sono più disponibili o non è necessaria l'attesa del completamento della pipeline da parte dell'applicazione.

Attesa del completamento della pipeline

Aggiungere il codice seguente per attendere il completamento della pipeline. L'operazione globale viene completata al termine della parte finale della pipeline.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

È possibile attendere il completamento del flusso di dati da qualsiasi thread o da più thread contemporaneamente.

Esempio completo

Nell'esempio riportato di seguito viene illustrato il codice completo per questa procedura guidata.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Passaggi successivi

In questo esempio viene inviato un URL da elaborare tramite la pipeline del flusso di dati. Se si invia più di un valore di input tramite una pipeline, è possibile introdurre un form di parallelismo nell'applicazione che rappresenta il numero di parti che possono essere spostate in una fabbrica di automobili. Quando tramite il primo membro della pipeline viene inviato il risultato al secondo membro, è possibile elaborare un altro elemento in parallelo mentre tramite il secondo membro viene elaborato il primo risultato.

Il parallelismo raggiunto attraverso l'uso di pipeline di flussi di dati è noto come parallelismo con granularità grossolana poiché consiste in genere in attività più o meno grandi. È anche possibile usare un parallelismo con granularità più fine di attività più piccole a esecuzione breve in una pipeline del flusso di dati. In questo esempio, nel membro findReversedWords della pipeline viene usato PLINQ per elaborare in parallelo più elementi nell'elenco di lavoro. L'utilizzo di parallelismo con granularità fine in una pipeline con granularità grossolana può migliorare la velocità effettiva globale.

È anche possibile connettere un blocco di origine del flusso di dati a più blocchi di destinazione per creare una rete del flusso di dati. La versione sottoposta a overload del metodo LinkTo accetta un oggetto Predicate<T> mediante il quale viene definito se il blocco di destinazione accetta ogni messaggio in base al relativo valore. La maggior parte dei tipi di blocchi di flussi di dati utilizzati come origini offrono messaggi a tutti i blocchi di destinazione connessi, nell'ordine in cui sono stati collegati, finché uno dei blocchi non accetta il messaggio. Tramite questo meccanismo di filtro, è possibile creare sistemi di blocchi di flussi di dati connessi in cui determinati dati vengono indirizzati tramite un percorso mentre altri tramite un altro percorso. Per un esempio che usa il filtro per creare una rete del flusso di dati, vedere Procedura dettagliata: Uso del flusso di dati in un'applicazione Windows Forms.

Vedi anche