Compartir a través de


Tutorial: Creación de una canalización de flujos de datos

Aunque puede usar los métodos DataflowBlock.Receive, DataflowBlock.ReceiveAsync y DataflowBlock.TryReceive para recibir mensajes de los bloques de origen, también puede conectar los bloques de mensajes para formar una canalización de flujo de datos. Una canalización de flujo datos es una serie de componentes, o bloques de flujo de datos, de los que cada uno realiza una tarea concreta que contribuye a lograr un objetivo mayor. Cada bloque de flujo de datos de una canalización de flujo de datos realiza un determinado trabajo cuando recibe un mensaje de otro bloque de flujo de datos. Se podría establecer una analogía de esto con una cadena de montaje en la fabricación de automóviles. Mientras cada vehículo pasa a través de la línea de montaje, una estación monta el bastidor, la siguiente instala el motor y así sucesivamente. Dado que una cadena de montaje permite montar varios vehículos al mismo tiempo, proporciona un mejor rendimiento que montar de uno en uno los vehículos completos.

En este documento se muestra una canalización de flujo de datos que descarga el libro La Ilíada de Homero desde un sitio web y explora el texto para encontrar coincidencias entre palabras individuales y palabras que revierten el orden de los caracteres de la primera palabra. La formación de la canalización de flujo de datos en este documento se compone de los siguientes pasos:

  1. Crear los bloques de flujo de datos que participan en la canalización.

  2. Conectar cada bloque de flujo de datos con el siguiente bloque de la canalización. Cada bloque recibe como entrada la salida del bloque anterior de la canalización.

  3. Para cada bloque de flujo de datos, crear una tarea de continuación que establezca el siguiente bloque en estado completado después de que finalice el bloque anterior.

  4. Publicar datos en el encabezado de la canalización.

  5. Marcar el encabezado de la canalización como completado.

  6. Esperar a que la canalización complete todo el trabajo.

Requisitos previos

Lea Flujo de datos antes de empezar este tutorial.

Crear una aplicación de consola

En Visual Studio, cree un proyecto Aplicación de consola de Visual C# o Visual Basic. Instale el paquete NuGet System.Threading.Tasks.Dataflow.

Nota:

La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow.

Agregue el código siguiente a su proyecto para crear la aplicación básica.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

Creación de los bloques de flujo de datos

Agregue el código siguiente al método Main para crear los bloques de flujo de datos que participan en la canalización. En la tabla siguiente se resume el rol de cada miembro de la canalización.

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine("Downloading '{0}'...", uri);

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine("Found reversed words {0}/{1}",
      reversedWord, new string(reversedWord.Reverse().ToArray()));
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
Miembro Tipo Descripción
downloadString TransformBlock<TInput,TOutput> Descarga el texto del libro desde la Web.
createWordList TransformBlock<TInput,TOutput> Separa el texto del libro en una matriz de palabras.
filterWordList TransformBlock<TInput,TOutput> Quita palabras cortas y duplicados de la matriz de palabras.
findReversedWords TransformManyBlock<TInput,TOutput> Busca todas las palabras de la colección de matriz de palabras filtradas cuya palabra inversa también se encuentra en la matriz de palabras.
printReversedWords ActionBlock<TInput> Muestra las palabras y sus correspondientes palabras inversas en la consola.

Aunque se podrían combinar varios pasos de la canalización de flujo de datos de este ejemplo en un solo paso, en el ejemplo se muestra el concepto de componer varias tareas de flujo de datos independientes para realizar una tarea mayor. En el ejemplo se usa TransformBlock<TInput,TOutput> para permitir que cada miembro de la canalización pueda realizar una operación en sus datos de entrada y enviar los resultados al siguiente paso de la canalización. El miembro findReversedWords de la canalización es un objeto TransformManyBlock<TInput,TOutput> porque genera varias salidas independientes para cada entrada. El final de la canalización, printReversedWords, es un objeto ActionBlock<TInput> porque realiza una acción en su entrada y no genera una salida.

Formación de la canalización

Agregue el código siguiente para conectar cada bloque con el bloque siguiente en la canalización.

Cuando se llama al método LinkTo para conectar un bloque de flujo de datos de origen a un bloque de flujo de datos de destino, el bloque de origen propaga los datos al bloque de destino a medida que los datos se encuentran disponibles. Si también proporciona DataflowLinkOptions con PropagateCompletion establecido en true, la finalización correcta o incorrecta de un bloque de la canalización conllevará la finalización del siguiente bloque de la canalización.

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

Envío de datos a la canalización

Agregue el código siguiente para publicar la dirección URL del libro La Ilíada de Homero en el encabezado de la canalización de flujo de datos.

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

En este ejemplo se usa DataflowBlock.Post para enviar datos de forma sincrónica al encabezado de la canalización. Use el método DataflowBlock.SendAsync cuando deba enviar datos de forma asincrónica a un nodo de flujo de datos.

Finalización de la actividad de canalización

Agregue el código siguiente para marcar el encabezado de la canalización como completado. El encabezado de la canalización propaga su finalización después de procesar todos los mensajes almacenados en búfer.

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

En este ejemplo se envía una dirección URL a través de la canalización de flujo de datos para que se procese. Si envía más de una entrada a través de una canalización, llame al método IDataflowBlock.Complete después de enviar toda la entrada. Puede omitir este paso si la aplicación no tiene ningún punto bien definido en el que los datos ya no están disponibles o la aplicación no tiene que esperar a que finalice la canalización.

Espera para la finalización de la canalización

Agregue el código siguiente para esperar a que finalice la canalización. La operación global termina cuando finaliza la cola de la canalización.

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

Puede esperar la finalización del flujo de datos desde cualquier subproceso o desde varios subprocesos al mismo tiempo.

Ejemplo completo

En el ejemplo siguiente se muestra el código completo de este tutorial.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine("Downloading '{0}'...", uri);

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine("Found reversed words {0}/{1}",
            reversedWord, new string(reversedWord.Reverse().ToArray()));
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

Pasos siguientes

En este ejemplo se envía una dirección URL para procesarla a través de la canalización de flujo de datos. Si envía más de un valor de entrada a través de una canalización, puede introducir un formulario de paralelismo en la aplicación que se parezca a cómo se moverían las piezas en una fábrica de automóviles. Cuando el primer miembro de la canalización envía su resultado al segundo miembro, puede procesar otro elemento en paralelo mientras el segundo miembro procesa el primer resultado.

El paralelismo que se logra mediante el uso de canalizaciones de flujo de datos se conoce como paralelismo general porque normalmente consta de menos tareas y más grandes. También puede usar un paralelismo específico de tareas más pequeñas y breves en una canalización de flujo de datos. En este ejemplo, el miembro findReversedWords de la canalización usa PLINQ para procesar en paralelo varios elementos de la lista de trabajo. El uso de paralelismo de grano fino en una canalización de grano grueso puede mejorar el rendimiento global.

También puede conectar un bloque de flujo de datos de origen a varios bloques de destino para crear una red de flujo de datos. La versión sobrecargada del método LinkTo toma un objeto Predicate<T> que define si el bloque de destino acepta cada mensaje según su valor. La mayoría de los tipos de bloques de flujo de datos que actúan como orígenes ofrecen mensajes a todos los bloques de destino conectados, siguiendo el orden en que se conectaron, hasta que uno de los bloques acepta ese mensaje. Mediante este mecanismo de filtrado, puede crear sistemas de bloques de flujo de datos conectados que dirigen determinados datos a través de una ruta de acceso y otros datos a través de otra ruta de acceso. Para ver un ejemplo que usa el filtrado para crear una red de flujo de datos, consulte Tutorial: Uso de flujos de datos en aplicaciones de Windows Forms.

Vea también