Compartilhar via


Fluxo de dados (Task Parallel Library)

A TPL (biblioteca de paralelismo de tarefas) fornece componentes de fluxo de dados para ajudar a aumentar a robustez de aplicativos habilitados para simultaneidade. Esses componentes de fluxo de dados são coletivamente chamados de biblioteca de fluxos de dados TPL. Esse modelo de fluxo de dados promove programação baseada em ator que fornece transmissão de mensagem no processo para fluxo de dados de alta granularidade e tarefas de pipelining. Os componentes de fluxo de dados baseiam-se nos tipos e na infraestrutura de agendamento da TPL e integram-se ao suporte às linguagens C#, Visual Basic e F# para programação assíncrona. Esses componentes do fluxo de dados são úteis quando você tem várias operações que devem se comunicar umas com as outras de modo assíncrono ou quando você deseja processar dados à medida que são disponibilizados. Por exemplo, considere um aplicativo que processa dados de imagem de uma webcam. Usando o modelo de fluxo de dados, o aplicativo pode processar quadros de imagem assim que eles se tornarem disponíveis. Se o aplicativo aprimora os quadros de imagem, por exemplo, executando a redução de olhos vermelhos ou correção de luz, você pode criar um pipeline dos componentes de fluxo de dados. Cada estágio do pipeline pode usar mais funcionalidade de paralelismo de alta granularidade, assim como a funcionalidade fornecida pela TPL, para transformar a imagem.

Este documento fornece uma visão geral da biblioteca de fluxos de dados TPL. Ele descreve o modelo de programação, os tipos de blocos de fluxo de dados predefinidos e como configurar os blocos de fluxo de dados para atender os requisitos específicos de seus aplicativos.

Observação

A Biblioteca de Fluxo de Dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com o .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra o projeto, escolha Gerenciar Pacotes NuGet no menu Projeto e pesquise online o pacote System.Threading.Tasks.Dataflow. Como alternativa, instale-o usando a CLI do .NET Core e execute dotnet add package System.Threading.Tasks.Dataflow.

Modelo de Programação

A biblioteca de fluxos de dados TPL fornece uma base para a transmissão de mensagens e a paralelização de aplicativos com uso intensivo de CPU e de E/S que têm alta taxa de transferência e baixa latência. Ela também fornece controle explícito sobre como os dados são armazenados em buffer e se movem pelo sistema. Para entender melhor o modelo de programação de fluxo de dados, considere um aplicativo que carrega imagens do disco de forma assíncrona e cria uma composição dessas imagens. Modelos de programação tradicionais normalmente exigem que você use retornos de chamada e objetos de sincronização, tais como bloqueios, para coordenar tarefas e acesso a dados compartilhados. Usando o modelo de programação de fluxo de dados, você pode criar objetos de fluxo de dados que processam imagens conforme elas são lidas do disco. No modelo de fluxo de dados, você declara como os dados serão manipulados quando se tornarem disponíveis e também quaisquer eventuais dependências entre os dados. Já que o runtime gerencia as dependências entre os dados, muitas vezes você pode evitar a necessidade de sincronizar o acesso aos dados compartilhados. Além disso, já que o runtime agenda o trabalho com base na chegada assíncrona de dados, o fluxo de dados pode aumentar a capacidade de resposta e a taxa de transferência ao gerenciar com eficiência os threads subjacentes. Para obter um exemplo que usa o modelo de programação de fluxo de dados para implementar o processamento de imagens em um aplicativo Windows Forms, consulte Passo a passo: usando um fluxo de dados em um Aplicativo do Windows Forms.

Origens e Destinos

A biblioteca de fluxos de dados TPL consiste em blocos de fluxo de dados, que são estruturas de dados que armazenam dados em buffer e os processam. A TPL define três tipos de blocos de fluxo de dados: blocos de origem, blocos de destino e blocos propagadores. Um bloco de origem atua como uma fonte de dados e possibilita a leitura de dados presentes nele. Um bloco de destino atua como um receptor de dados e possibilita que nele sejam gravados dados. Um bloco propagador atua como um bloco de origem e um bloco de destino e possibilita tanto leitura quanto gravação. A TPL define a interface System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> para representar origens, System.Threading.Tasks.Dataflow.ITargetBlock<TInput> para representar destinos e System.Threading.Tasks.Dataflow.IPropagatorBlock<TInput,TOutput> para representar propagadores. IPropagatorBlock<TInput,TOutput> herda de ISourceBlock<TOutput> e de ITargetBlock<TInput>.

A Biblioteca de fluxos de dados TPL fornece vários tipos de bloco de fluxo de dados predefinidos que implementam as interfaces ISourceBlock<TOutput>, ITargetBlock<TInput> e IPropagatorBlock<TInput,TOutput>. Esses tipos de bloco de fluxo de dados são descritos neste documento na seção Tipos de bloco de fluxo de dados predefinidos.

Conectar blocos

Você pode conectar blocos de fluxo de dados para formar pipelines, que são sequências lineares de blocos de fluxo de dados, ou então redes, que são gráficos de blocos de fluxo de dados. Um pipeline é uma forma de rede. Em uma rede ou pipeline, origens propagam dados assincronamente para destinos assim que os dados ficam disponíveis. O método ISourceBlock<TOutput>.LinkTo vincula um bloco de fluxo de dados de origem a um bloco de destino. Uma origem pode ser vinculada a zero ou mais destinos, enquanto os destinos podem ser vinculados de zero ou mais origens. Você pode adicionar ou remover blocos de fluxo de dados para ou de um pipeline ou rede simultaneamente. Os tipos de bloco de fluxo de dados predefinidos tratam de todos os aspectos de acesso thread-safe de vincular e desvincular.

Para obter um exemplo que conecta os blocos de fluxo de dados para formar um pipeline básico, consulte Passo a passo: criando um pipeline de fluxo de dados. Para obter um exemplo que conecta os blocos de fluxo de dados para formar uma rede mais complexa, consulte Passo a passo: usando o fluxo de dados em um Aplicativo do Windows Forms. Para obter um exemplo que desvincula um destino de uma origem depois que a origem oferece uma mensagem ao destino, consulte Como desvincular blocos de fluxo de dados.

Filtragem

Ao chamar o método ISourceBlock<TOutput>.LinkTo para vincular uma origem a um destino, você pode fornecer um representante que determina se o bloco de destino aceita ou rejeita uma mensagem com base no valor dela. Esse mecanismo de filtragem é uma maneira útil para garantir que um bloco de fluxo de dados receba apenas determinados valores. Para a maioria dos tipos de bloco de fluxo de dados predefinidos, se um bloco de origem estiver conectado a vários blocos de destino, quando um bloco de destino rejeitar uma mensagem, a origem oferecerá essa mensagem para o próximo destino. A ordem em que uma origem oferece mensagens para destinos é definida pela origem e pode variar de acordo com o tipo da origem. A maioria dos tipos de blocos de origem para de oferecer uma mensagem depois que um destino aceita essa mensagem. Uma exceção a essa regra é a classe BroadcastBlock<T>, que oferece cada mensagem para todos os destinos, mesmo que alguns destinos rejeitem a mensagem. Para obter um exemplo que usa a filtragem para processar apenas mensagens específicas, consulte Passo a passo: usando o fluxo de dados em um Aplicativo do Windows Forms.

Importante

Já que cada tipo de bloco de fluxo de dados de origem predefinido garante que as mensagens sejam propagadas na ordem em que são recebidas, toda mensagem deve ser lida do bloco de origem antes que o bloco de origem possa processar a próxima mensagem. Portanto, quando você usa a filtragem para conectar vários destinos a uma origem, certifique-se de que pelo menos um bloco de destino receba cada mensagem. Caso contrário, seu aplicativo poderá sofrer deadlock.

Transmissão de mensagens

O modelo de programação de fluxo de dados está relacionado ao conceito de transmissão de mensagens, em que componentes independentes de um programa se comunicam uns com os outros pelo envio de mensagens. Uma maneira de propagar mensagens entre componentes de aplicativos é chamar os métodos Post (síncrono) e SendAsync (assíncrono) para enviar mensagens para blocos de fluxo de dados de destino, e os métodos Receive, ReceiveAsync e TryReceive para receber mensagens de blocos de origem. Você pode combinar esses métodos com redes ou pipelines de fluxo de dados por meio do envio de dados de entrada para o nó de cabeçalho (um bloco de destino) e receber dados de saída do nó terminal do pipeline ou nós terminais da rede (um ou mais blocos de origem). Você também pode usar o método Choose para ler a partir da primeira origem fornecida que tem dados disponíveis e executar ações nesses dados.

Os blocos de código-fonte oferecem dados ao blocos de destino chamando o método ITargetBlock<TInput>.OfferMessage. O bloco de destino responde a uma mensagem oferecida em uma destas três maneiras: ele pode aceitar a mensagem, recusá-la ou adiá-la. Quando o destino aceita a mensagem, o método OfferMessage retornará Accepted. Quando o destino rejeita a mensagem, o método OfferMessage retornará Declined. Quando o destino exige não receber mais as mensagens da origem, OfferMessage retornará DecliningPermanently. Os tipos de bloco de origem predefinidos não oferecem mensagens para destinos vinculados após o recebimento desse valor retornado e eles se desvinculam automaticamente desses destinos.

Quando um bloco de destino adia a mensagem para uso posterior, o método OfferMessage retornará Postponed. Um bloco de destino que adia uma mensagem pode mais tarde chamar o método ISourceBlock<TOutput>.ReserveMessage para tentar reservar a mensagem oferecida. Neste ponto, é possível que a mensagem ainda esteja disponível e possa ser usada pelo bloco de destino ou então que a mensagem tenha sido tomada por outro destino. Quando, posteriormente, o bloco de destino exige a mensagem ou já não precisa dela, ele chama o método ISourceBlock<TOutput>.ConsumeMessage ou ReleaseReservation, respectivamente. Reserva de mensagem normalmente é usada pelos tipos de bloco de fluxo de dados que operam em modo não greedy. O modo não greedy é explicado mais adiante neste documento. Em vez de reservar uma mensagem adiada, um bloco de destino também pode usar o método ISourceBlock<TOutput>.ConsumeMessage para tentar consumir diretamente a mensagem adiada.

Conclusão do bloco de fluxo de dados

Blocos de fluxo de dados também dão suporte ao conceito de conclusão. Um bloco de fluxo de dados que está no estado concluído não executa nenhum trabalho adicional. Cada bloco de fluxo de dados tem um objeto System.Threading.Tasks.Task associado, conhecido como uma tarefa de conclusão que representa o status de conclusão do bloco. Já que você pode esperar a conclusão de um objeto Task por meio do uso de tarefas de conclusão, espere pela conclusão de um ou mais nós terminais de uma rede de fluxo de dados. A interface IDataflowBlock define o método Complete que informa o bloco de fluxo de dados sobre uma solicitação para que ela seja concluída e a propriedade Completion que retorna a tarefa de conclusão para o bloco de fluxo de dados. ISourceBlock<TOutput> e ITargetBlock<TInput> herdam a interface IDataflowBlock.

Há duas maneiras de determinar se um bloco de fluxo de dados foi concluído sem erros, encontrou um ou mais erros ou foi cancelado. A primeira maneira é chamar o método Task.Wait na tarefa de conclusão em um bloco try-catch (Try-Catch no Visual Basic). O exemplo a seguir cria um objeto ActionBlock<TInput> que gera ArgumentOutOfRangeException se o valor de entrada for menor que zero. AggregateException é gerado quando este exemplo chama Wait na tarefa de conclusão. O ArgumentOutOfRangeException pode ser acessado pela propriedade InnerExceptions do objeto AggregateException.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Este exemplo demonstra o caso em que uma exceção fica sem tratamento no delegado de um bloco de fluxo de dados de execução. É recomendável que você trate exceções nos corpos de tais blocos. No entanto, se não for possível fazer isso, o bloco se comportará como se tivesse sido cancelado e não processará mensagens de entrada.

Quando um bloco de fluxo de dados é explicitamente cancelado, o objeto AggregateException contém OperationCanceledException na propriedade InnerExceptions. Para obter mais informações sobre o cancelamento de fluxo de dados, consulte a seção Habilitando o cancelamento.

A segunda maneira de determinar o status de conclusão de um bloco de fluxo de dados é usar uma continuação da tarefa de conclusão ou então usar os recursos de linguagem assíncrona do C# e Visual Basic para aguardar de modo assíncrono a tarefa de conclusão. O representante que você fornece ao método Task.ContinueWith usa um objeto Task que representa a tarefa antecedente. No caso da propriedade Completion, o próprio representante para a continuação recebe a tarefa de conclusão. O exemplo a seguir é semelhante ao anterior, exceto pelo fato de que ele também usa o método ContinueWith para criar uma tarefa de conclusão que imprime o status da operação de fluxo de dados geral.

// Create an ActionBlock<int> object that prints its input
// and throws ArgumentOutOfRangeException if the input
// is less than zero.
var throwIfNegative = new ActionBlock<int>(n =>
{
   Console.WriteLine("n = {0}", n);
   if (n < 0)
   {
      throw new ArgumentOutOfRangeException();
   }
});

// Create a continuation task that prints the overall
// task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(task =>
{
   Console.WriteLine("The status of the completion task is '{0}'.",
      task.Status);
});

// Post values to the block.
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(-2);
throwIfNegative.Complete();

// Wait for completion in a try/catch block.
try
{
   throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
   // If an unhandled exception occurs during dataflow processing, all
   // exceptions are propagated through an AggregateException object.
   ae.Handle(e =>
   {
      Console.WriteLine("Encountered {0}: {1}",
         e.GetType().Name, e.Message);
      return true;
   });
}

/* Output:
n = 0
n = -1
The status of the completion task is 'Faulted'.
Encountered ArgumentOutOfRangeException: Specified argument was out of the range
 of valid values.
*/
' Create an ActionBlock<int> object that prints its input
' and throws ArgumentOutOfRangeException if the input
' is less than zero.
Dim throwIfNegative = New ActionBlock(Of Integer)(Sub(n)
                                                      Console.WriteLine("n = {0}", n)
                                                      If n < 0 Then
                                                          Throw New ArgumentOutOfRangeException()
                                                      End If
                                                  End Sub)

' Create a continuation task that prints the overall 
' task status to the console when the block finishes.
throwIfNegative.Completion.ContinueWith(Sub(task) Console.WriteLine("The status of the completion task is '{0}'.", task.Status))

' Post values to the block.
throwIfNegative.Post(0)
throwIfNegative.Post(-1)
throwIfNegative.Post(1)
throwIfNegative.Post(-2)
throwIfNegative.Complete()

' Wait for completion in a try/catch block.
Try
    throwIfNegative.Completion.Wait()
Catch ae As AggregateException
    ' If an unhandled exception occurs during dataflow processing, all
    ' exceptions are propagated through an AggregateException object.
    ae.Handle(Function(e)
                  Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message)
                  Return True
              End Function)
End Try

'          Output:
'         n = 0
'         n = -1
'         The status of the completion task is 'Faulted'.
'         Encountered ArgumentOutOfRangeException: Specified argument was out of the range
'          of valid values.
'         

Você também pode usar propriedades como IsCanceled no corpo da tarefa de continuação para determinar informações adicionais sobre o status de conclusão de um bloco de fluxo de dados. Para obter mais informações sobre tarefas de continuação e como elas se relacionam com cancelamento e tratamento de erro, consulte Encadeamento de tarefas pelo uso de tarefas de continuação, Cancelamento de tarefas e Tratamento de exceções.

Tipos de bloco de fluxo de dados predefinidos

A biblioteca de fluxos de dados TPL fornece vários tipos de bloco de fluxo de dados predefinidos. Esses tipos são divididos em três categorias: blocos de buffer, blocos de execução e blocos de agrupamento. As seções a seguir descrevem os tipos de bloco que compõem essas categorias.

Blocos de buffer

Blocos de buffer armazenam dados para uso pelos consumidores de dados. A Biblioteca de fluxos de dados TPL fornece três tipos de blocos de buffer: System.Threading.Tasks.Dataflow.BufferBlock<T>, System.Threading.Tasks.Dataflow.BroadcastBlock<T> e System.Threading.Tasks.Dataflow.WriteOnceBlock<T>.

BufferBlock<T>

A classe BufferBlock<T> representa uma estrutura de sistema de mensagens assíncrona para fins gerais. Essa classe armazena uma fila PEPS (primeiro a entrar, primeiro a sair) de mensagens que podem ser gravadas por várias origens ou lidas por vários destinos. Quando um destino recebe uma mensagem de um objeto BufferBlock<T>, essa mensagem é removida da fila de mensagens. Portanto, embora um objeto BufferBlock<T> possa ter vários destinos, cada mensagem será recebida por apenas um destino. A classe BufferBlock<T> é útil quando você deseja passar várias mensagens para outro componente e esse componente deve receber cada uma das mensagens.

O exemplo básico a seguir posta diversos valores Int32 em um objeto BufferBlock<T> e, em seguida, lê esses valores de volta a partir desse objeto.

// Create a BufferBlock<int> object.
var bufferBlock = new BufferBlock<int>();

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   bufferBlock.Post(i);
}

// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(bufferBlock.Receive());
}

/* Output:
   0
   1
   2
 */
' Create a BufferBlock<int> object.
Dim bufferBlock = New BufferBlock(Of Integer)()

' Post several messages to the block.
For i As Integer = 0 To 2
    bufferBlock.Post(i)
Next i

' Receive the messages back from the block.
For i As Integer = 0 To 2
    Console.WriteLine(bufferBlock.Receive())
Next i

'          Output:
'            0
'            1
'            2
'          

Para obter um exemplo completo que demonstra como gravar mensagens e ler mensagens de um objeto BufferBlock<T>, confira Como gravar mensagens em um bloco de fluxo de dados e ler mensagens de um bloco de fluxo de dados.

BroadcastBlock<T>

A classe BroadcastBlock<T> é útil quando você precisa passar várias mensagens para outro componente mas esse componente precisa apenas do valor mais recente. Essa classe também é útil quando você deseja difundir uma mensagem para vários componentes.

O exemplo básico a seguir posta diversos valores Double em um objeto BroadcastBlock<T> e, em seguida, lê esses valores de volta a partir desse objeto por diversas vezes. Como valores não são removidos dos objetos BroadcastBlock<T> depois que eles são lidos, o mesmo valor está disponível todas as vezes.

// Create a BroadcastBlock<double> object.
var broadcastBlock = new BroadcastBlock<double>(null);

// Post a message to the block.
broadcastBlock.Post(Math.PI);

// Receive the messages back from the block several times.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(broadcastBlock.Receive());
}

/* Output:
   3.14159265358979
   3.14159265358979
   3.14159265358979
 */
' Create a BroadcastBlock<double> object.
Dim broadcastBlock = New BroadcastBlock(Of Double)(Nothing)

' Post a message to the block.
broadcastBlock.Post(Math.PI)

' Receive the messages back from the block several times.
For i As Integer = 0 To 2
    Console.WriteLine(broadcastBlock.Receive())
Next i

'          Output:
'            3.14159265358979
'            3.14159265358979
'            3.14159265358979
'          

Para obter um exemplo completo que demonstra como usar BroadcastBlock<T> para difundir uma mensagem a vários blocos de destino, confira Como especificar um agendador de tarefas em um bloco de fluxo de dados.

WriteOnceBlock<T>

A classe WriteOnceBlock<T> é semelhante à classe BroadcastBlock<T>, exceto no ponto em que um objeto WriteOnceBlock<T> pode ser gravado apenas uma vez. Você pode pensar em WriteOnceBlock<T> como sendo semelhante à palavra-chave de C# readonly (ReadOnly no Visual Basic), exceto pelo fato de que um objeto WriteOnceBlock<T> torna-se imutável depois de receber um valor em vez de uma construção. Como na classe BroadcastBlock<T>, quando um destino recebe uma mensagem de um objeto WriteOnceBlock<T>, essa mensagem não é removida do objeto. Portanto, vários destinos recebem uma cópia da mensagem. A classe WriteOnceBlock<T> é útil quando você deseja propagar apenas a primeira de várias mensagens.

O exemplo básico a seguir posta diversos valores String em um objeto WriteOnceBlock<T> e, em seguida, lê esse valor de volta a partir desse objeto. Como um objeto WriteOnceBlock<T> pode ser gravado apenas uma vez, após um objeto WriteOnceBlock<T> receber uma mensagem, ele descartará mensagens subsequentes.

// Create a WriteOnceBlock<string> object.
var writeOnceBlock = new WriteOnceBlock<string>(null);

// Post several messages to the block in parallel. The first
// message to be received is written to the block.
// Subsequent messages are discarded.
Parallel.Invoke(
   () => writeOnceBlock.Post("Message 1"),
   () => writeOnceBlock.Post("Message 2"),
   () => writeOnceBlock.Post("Message 3"));

// Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive());

/* Sample output:
   Message 2
 */
' Create a WriteOnceBlock<string> object.
Dim writeOnceBlock = New WriteOnceBlock(Of String)(Nothing)

' Post several messages to the block in parallel. The first 
' message to be received is written to the block. 
' Subsequent messages are discarded.
Parallel.Invoke(Function() writeOnceBlock.Post("Message 1"), Function() writeOnceBlock.Post("Message 2"), Function() writeOnceBlock.Post("Message 3"))

' Receive the message from the block.
Console.WriteLine(writeOnceBlock.Receive())

'          Sample output:
'            Message 2
'          

Para obter um exemplo completo que demonstra como usar WriteOnceBlock<T> para receber o valor da primeira operação que termina, confira Como desvincular blocos de fluxo de dados.

Blocos de execução

Blocos de execução chamam um delegado fornecido pelo usuário para cada parte dos dados recebidos. A Biblioteca de Fluxos de Dados TPL fornece três tipos de blocos de execução: ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput>.

ActionBlock<T>

A classe ActionBlock<TInput> é um bloco de destino que chama um representante ao receber dados. Pense em um objeto ActionBlock<TInput> como um representante que é executado de modo assíncrono quando os dados se tornam disponíveis. O representante que você fornece a um objeto ActionBlock<TInput> pode ser do tipo Action<T> ou tipo System.Func<TInput, Task>. Quando você usa um objeto ActionBlock<TInput> com Action<T>, o processamento de cada elemento de entrada é considerado concluído quando o representante retorna. Quando você usa um objeto ActionBlock<TInput> com System.Func<TInput, Task>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto retornado Task for concluído. Usando esses dois mecanismos, você pode usar ActionBlock<TInput> no processamento síncrono e assíncrono de cada elemento de entrada.

O exemplo simples a seguir envia vários valores Int32 para um objeto ActionBlock<TInput>. O objeto ActionBlock<TInput> imprime esses valores para o console. Este exemplo, em seguida, define o bloco para o estado concluído e aguarda que todas as tarefas de fluxo de dados sejam concluídas.

// Create an ActionBlock<int> object that prints values
// to the console.
var actionBlock = new ActionBlock<int>(n => Console.WriteLine(n));

// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
   actionBlock.Post(i * 10);
}

// Set the block to the completed state and wait for all
// tasks to finish.
actionBlock.Complete();
actionBlock.Completion.Wait();

/* Output:
   0
   10
   20
 */
' Create an ActionBlock<int> object that prints values
' to the console.
Dim actionBlock = New ActionBlock(Of Integer)(Function(n) WriteLine(n))

' Post several messages to the block.
For i As Integer = 0 To 2
    actionBlock.Post(i * 10)
Next i

' Set the block to the completed state and wait for all 
' tasks to finish.
actionBlock.Complete()
actionBlock.Completion.Wait()

'          Output:
'            0
'            10
'            20
'          

Para concluir exemplos que demonstram como usar representantes com a classe ActionBlock<TInput>, confira Como executar ações quando um bloco de fluxo de dados recebe dados.

TransformBlock<TInput, TOutput>

A classe TransformBlock<TInput,TOutput> é semelhante à classe ActionBlock<TInput>, exceto que ela atua tanto como origem quanto destino. O representante que você passa para um objeto TransformBlock<TInput,TOutput> retorna um valor do tipo TOutput. O representante que você fornece a um objeto TransformBlock<TInput,TOutput> pode ser do tipo System.Func<TInput, TOutput> ou do tipo System.Func<TInput, Task<TOutput>>. Quando você usa um objeto TransformBlock<TInput,TOutput> com System.Func<TInput, TOutput>, o processamento de cada elemento de entrada é considerado concluído quando o representante retorna. Quando você usa um objeto TransformBlock<TInput,TOutput> com System.Func<TInput, Task<TOutput>>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto retornado Task<TResult> for concluído. Como acontece com o ActionBlock<TInput>, ao usar esses dois mecanismos, você pode usar TransformBlock<TInput,TOutput> no processamento síncrono e assíncrono de cada elemento de entrada.

O exemplo básico a seguir cria um objeto TransformBlock<TInput,TOutput> que computa a raiz quadrada de sua entrada. O objeto TransformBlock<TInput,TOutput> usa valores Int32 como entrada e produz valores Double como saída.

// Create a TransformBlock<int, double> object that
// computes the square root of its input.
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

// Post several messages to the block.
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);

// Read the output messages from the block.
for (int i = 0; i < 3; i++)
{
   Console.WriteLine(transformBlock.Receive());
}

/* Output:
   3.16227766016838
   4.47213595499958
   5.47722557505166
 */
' Create a TransformBlock<int, double> object that 
' computes the square root of its input.
Dim transformBlock = New TransformBlock(Of Integer, Double)(Function(n) Math.Sqrt(n))

' Post several messages to the block.
transformBlock.Post(10)
transformBlock.Post(20)
transformBlock.Post(30)

' Read the output messages from the block.
For i As Integer = 0 To 2
    Console.WriteLine(transformBlock.Receive())
Next i

'          Output:
'            3.16227766016838
'            4.47213595499958
'            5.47722557505166
'          

Para obter exemplos completos que usam TransformBlock<TInput,TOutput> em uma rede de blocos de fluxo de dados que executa o processamento de imagem em um Aplicativo do Windows Forms, confira Explicação passo a passo: usar o fluxo de dados em um Aplicativo Windows Forms.

TransformManyBlock<TInput, TOutput>

A classe TransformManyBlock<TInput,TOutput> é semelhante à classe TransformBlock<TInput,TOutput>, exceto no ponto em que TransformManyBlock<TInput,TOutput> produz zero ou mais valores de saída para cada valor de entrada, em vez de apenas um valor para cada valor de entrada de saída. O representante que você fornece a um objeto TransformManyBlock<TInput,TOutput> pode ser do tipo System.Func<TInput, IEnumerable<TOutput>> ou do tipo System.Func<TInput, Task<IEnumerable<TOutput>>>. Quando você usa um objeto TransformManyBlock<TInput,TOutput> com System.Func<TInput, IEnumerable<TOutput>>, o processamento de cada elemento de entrada é considerado concluído quando o representante retorna. Quando você usa um objeto TransformManyBlock<TInput,TOutput> com System.Func<TInput, Task<IEnumerable<TOutput>>>, o processamento de cada elemento de entrada é considerado concluído somente quando o objeto retornado System.Threading.Tasks.Task<IEnumerable<TOutput>> for concluído.

O exemplo básico a seguir cria um objeto TransformManyBlock<TInput,TOutput> que divide cadeias de caracteres em suas sequências de caracteres individuais. O objeto TransformManyBlock<TInput,TOutput> usa valores String como entrada e produz valores Char como saída.

// Create a TransformManyBlock<string, char> object that splits
// a string into its individual characters.
var transformManyBlock = new TransformManyBlock<string, char>(
   s => s.ToCharArray());

// Post two messages to the first block.
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");

// Receive all output values from the block.
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
   Console.WriteLine(transformManyBlock.Receive());
}

/* Output:
   H
   e
   l
   l
   o
   W
   o
   r
   l
   d
 */
' Create a TransformManyBlock<string, char> object that splits
' a string into its individual characters.
Dim transformManyBlock = New TransformManyBlock(Of String, Char)(Function(s) s.ToCharArray())

' Post two messages to the first block.
transformManyBlock.Post("Hello")
transformManyBlock.Post("World")

' Receive all output values from the block.
For i As Integer = 0 To ("Hello" & "World").Length - 1
    Console.WriteLine(transformManyBlock.Receive())
Next i

'          Output:
'            H
'            e
'            l
'            l
'            o
'            W
'            o
'            r
'            l
'            d
'          

Para obter exemplos completos que usam TransformManyBlock<TInput,TOutput> para produzir várias saídas independentes para cada entrada em um pipeline de fluxo de dados, confira Explicação passo a passo: criar um pipeline de fluxo de dados.

Grau de paralelismo

Cada objeto ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput> armazena mensagens de entrada até que o bloco esteja pronto para processá-las. Por padrão, essas classes processam mensagens na ordem em que elas são recebidas, uma mensagem por vez. Você também pode especificar o grau de paralelismo para habilitar os objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput> para processar várias mensagens simultaneamente. Para obter mais informações sobre a execução simultânea, consulte a seção Especificando o grau de paralelismo, presente neste documento. Para obter um exemplo que define o grau de paralelismo para habilitar um bloco de fluxo de dados de execução a processar mais de uma mensagem por vez, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.

Resumo de tipos de delegados

A tabela a seguir resume os tipos de representantes que você pode fornecer aos objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>. Esta tabela também especifica se o tipo de delegado opera de forma síncrona ou assíncrona.

Tipo Tipo de delegado síncrono Tipo de delegado assíncrono
ActionBlock<TInput> System.Action System.Func<TInput, Task>
TransformBlock<TInput,TOutput> System.Func<TInput, TOutput> System.Func<TInput, Task<TOutput>>
TransformManyBlock<TInput,TOutput> System.Func<TInput, IEnumerable<TOutput>> System.Func<TInput, Task<IEnumerable<TOutput>>>

Você também pode usar expressões lambda ao trabalhar com tipos de blocos de execução. Para obter um exemplo que mostra como usar uma expressão lambda com um bloco de execução, consulte Como executar ações quando um bloco de fluxo de dados recebe dados.

Blocos de agrupamento

Blocos de agrupamento combinam dados de uma ou mais origens e sob várias restrições. A Biblioteca de fluxos de dados TPL fornece três tipos de blocos de junção: BatchBlock<T>, JoinBlock<T1,T2> e BatchedJoinBlock<T1,T2>.

BatchBlock<T>

A classe BatchBlock<T> combina os conjuntos de dados de entrada, que são conhecidos como lotes, em matrizes de dados de saída. Especifique o tamanho de cada lote ao criar um objeto BatchBlock<T>. Quando o objeto BatchBlock<T> recebe a contagem especificada dos elementos de entrada, ele propaga, de modo assíncrono, uma matriz que contém esses elementos. Se um objeto BatchBlock<T> estiver definido com o estado concluído, mas não contiver elementos suficientes para formar um lote, ele propagará uma matriz final com os elementos de entrada restantes.

A classe BatchBlock<T> opera em qualquer um dos modos greedy ou não greedy. No modo greedy, que é o padrão, um objeto BatchBlock<T> aceita todas as mensagens que são oferecidas a ele e propaga uma matriz, depois de receber a contagem especificada de elementos. No modo não greedy, um objeto BatchBlock<T> adia todas as mensagens de entrada até que origens suficientes tenham oferecido mensagens para que o bloco forme um lote. O modo greedy normalmente tem melhor desempenho do que o modo não greedy porque ele requer menos sobrecarga de processamento. No entanto, você pode usar o modo não greedy quando você deve coordenar o consumo de várias origens de maneira atômica. Especifique o modo não greedy definindo Greedy como False no parâmetro dataflowBlockOptions no constructo BatchBlock<T>.

O exemplo básico a seguir posta diversos valores Int32 em um objeto BatchBlock<T> que contém dez elementos em um lote. Para garantir que todos os valores sejam propagados fora do BatchBlock<T>, este exemplo chama o método Complete. O método Complete define o objeto BatchBlock<T> com o estado concluído e, assim, o objeto BatchBlock<T> propaga quaisquer elementos restantes como um lote final.

// Create a BatchBlock<int> object that holds ten
// elements per batch.
var batchBlock = new BatchBlock<int>(10);

// Post several values to the block.
for (int i = 0; i < 13; i++)
{
   batchBlock.Post(i);
}
// Set the block to the completed state. This causes
// the block to propagate out any remaining
// values as a final batch.
batchBlock.Complete();

// Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.",
   batchBlock.Receive().Sum());

Console.WriteLine("The sum of the elements in batch 2 is {0}.",
   batchBlock.Receive().Sum());

/* Output:
   The sum of the elements in batch 1 is 45.
   The sum of the elements in batch 2 is 33.
 */
' Create a BatchBlock<int> object that holds ten
' elements per batch.
Dim batchBlock = New BatchBlock(Of Integer)(10)

' Post several values to the block.
For i As Integer = 0 To 12
    batchBlock.Post(i)
Next i
' Set the block to the completed state. This causes
' the block to propagate out any remaining
' values as a final batch.
batchBlock.Complete()

' Print the sum of both batches.

Console.WriteLine("The sum of the elements in batch 1 is {0}.", batchBlock.Receive().Sum())

Console.WriteLine("The sum of the elements in batch 2 is {0}.", batchBlock.Receive().Sum())

'          Output:
'            The sum of the elements in batch 1 is 45.
'            The sum of the elements in batch 2 is 33.
'          

Para obter um exemplo completo que usa BatchBlock<T> para aumentar a eficiência das operações de inserção do banco de dados, confira Explicação passo a passo: usar BatchBlock e BatchedJoinBlock para melhorar a eficiência.

JoinBlock<T1, T2, ...>

As classes JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> coletam elementos de entrada e os propaga fora dos objetos System.Tuple<T1,T2> ou System.Tuple<T1,T2,T3> que contêm esses elementos. As classes JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> não são herdadas de ITargetBlock<TInput>. Em vez disso, eles fornecem propriedades, Target1, Target2 e Target3, que implementam ITargetBlock<TInput>.

Da mesma forma que BatchBlock<T>, JoinBlock<T1,T2> e JoinBlock<T1,T2,T3> operam em qualquer um dos modos greedy ou não greedy. No modo greedy, que é o padrão, um objeto JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> aceita todas as mensagens que são oferecidas a ele e propaga uma tupla depois de cada um dos seus destinos receber pelo menos uma mensagem. No modo não greedy, um objeto JoinBlock<T1,T2> ou JoinBlock<T1,T2,T3> adia todas as mensagens de entrada até que os dados que são necessários para criar uma tupla tenham sido oferecidos para todos os destinos. Neste ponto, o bloco se envolve em um protocolo 2PC para recuperar atomicamente todos os itens obrigatórios das origens. Este adiamento possibilita que outra entidade consuma os dados e, enquanto isso, permite que o sistema em geral avance.

O exemplo básico a seguir demonstra um caso em que um objeto JoinBlock<T1,T2,T3> requer vários dados para calcular um valor. Este exemplo cria um objeto JoinBlock<T1,T2,T3> que requer dois valores Int32 e um valor Char para executar uma operação aritmética.

// Create a JoinBlock<int, int, char> object that requires
// two numbers and an operator.
var joinBlock = new JoinBlock<int, int, char>();

// Post two values to each target of the join.

joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);

joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);

joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');

// Receive each group of values and apply the operator part
// to the number parts.

for (int i = 0; i < 2; i++)
{
   var data = joinBlock.Receive();
   switch (data.Item3)
   {
      case '+':
         Console.WriteLine("{0} + {1} = {2}",
            data.Item1, data.Item2, data.Item1 + data.Item2);
         break;
      case '-':
         Console.WriteLine("{0} - {1} = {2}",
            data.Item1, data.Item2, data.Item1 - data.Item2);
         break;
      default:
         Console.WriteLine("Unknown operator '{0}'.", data.Item3);
         break;
   }
}

/* Output:
   3 + 5 = 8
   6 - 4 = 2
 */
' Create a JoinBlock<int, int, char> object that requires
' two numbers and an operator.
Dim joinBlock = New JoinBlock(Of Integer, Integer, Char)()

' Post two values to each target of the join.

joinBlock.Target1.Post(3)
joinBlock.Target1.Post(6)

joinBlock.Target2.Post(5)
joinBlock.Target2.Post(4)

joinBlock.Target3.Post("+"c)
joinBlock.Target3.Post("-"c)

' Receive each group of values and apply the operator part
' to the number parts.

For i As Integer = 0 To 1
    Dim data = joinBlock.Receive()
    Select Case data.Item3
        Case "+"c
            Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2)
        Case "-"c
            Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2)
        Case Else
            Console.WriteLine("Unknown operator '{0}'.", data.Item3)
    End Select
Next i

'          Output:
'            3 + 5 = 8
'            6 - 4 = 2
'          

Para obter um exemplo completo que usa objetos JoinBlock<T1,T2> no modo não greedy para compartilhar um recurso cooperativamente, confira Como usar JoinBlock para ler dados de várias origens.

BatchedJoinBlock<T1, T2, ...>

As classes BatchedJoinBlock<T1,T2> e BatchedJoinBlock<T1,T2,T3> coletam lotes de elementos de entrada e os propaga fora dos objetos System.Tuple(IList(T1), IList(T2)) ou System.Tuple(IList(T1), IList(T2), IList(T3)) que contêm esses elementos. Pense em BatchedJoinBlock<T1,T2> como uma combinação de BatchBlock<T> e JoinBlock<T1,T2>. Especifique o tamanho de cada lote ao criar um objeto BatchedJoinBlock<T1,T2>. BatchedJoinBlock<T1,T2> também fornece propriedades, Target1 e Target2, que implementam ITargetBlock<TInput>. Quando a contagem especificada dos elementos de entrada é recebida do conjunto de todos os destinos, o objeto BatchedJoinBlock<T1,T2> propaga, de modo assíncrono, um objeto System.Tuple(IList(T1), IList(T2)) que contém esses elementos.

O exemplo básico a seguir cria um objeto BatchedJoinBlock<T1,T2> que contém resultados, valores Int32 e erros que são objetos Exception. Este exemplo executa várias operações e grava os resultados na propriedade Target1 e os erros na propriedade Target2 do objeto BatchedJoinBlock<T1,T2>. Já que não se sabe com antecedência a contagem de operações bem-sucedidas e com falha, os objetos IList<T> permitem que cada destino receba zero ou mais valores.

// For demonstration, create a Func<int, int> that
// returns its argument, or throws ArgumentOutOfRangeException
// if the argument is less than zero.
Func<int, int> DoWork = n =>
{
   if (n < 0)
      throw new ArgumentOutOfRangeException();
   return n;
};

// Create a BatchedJoinBlock<int, Exception> object that holds
// seven elements per batch.
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

// Post several items to the block.
foreach (int i in new int[] { 5, 6, -7, -22, 13, 55, 0 })
{
   try
   {
      // Post the result of the worker to the
      // first target of the block.
      batchedJoinBlock.Target1.Post(DoWork(i));
   }
   catch (ArgumentOutOfRangeException e)
   {
      // If an error occurred, post the Exception to the
      // second target of the block.
      batchedJoinBlock.Target2.Post(e);
   }
}

// Read the results from the block.
var results = batchedJoinBlock.Receive();

// Print the results to the console.

// Print the results.
foreach (int n in results.Item1)
{
   Console.WriteLine(n);
}
// Print failures.
foreach (Exception e in results.Item2)
{
   Console.WriteLine(e.Message);
}

/* Output:
   5
   6
   13
   55
   0
   Specified argument was out of the range of valid values.
   Specified argument was out of the range of valid values.
 */
' For demonstration, create a Func<int, int> that 
' returns its argument, or throws ArgumentOutOfRangeException
' if the argument is less than zero.
Dim DoWork As Func(Of Integer, Integer) = Function(n)
                                              If n < 0 Then
                                                  Throw New ArgumentOutOfRangeException()
                                              End If
                                              Return n
                                          End Function

' Create a BatchedJoinBlock<int, Exception> object that holds 
' seven elements per batch.
Dim batchedJoinBlock = New BatchedJoinBlock(Of Integer, Exception)(7)

' Post several items to the block.
For Each i As Integer In New Integer() {5, 6, -7, -22, 13, 55, 0}
    Try
        ' Post the result of the worker to the 
        ' first target of the block.
        batchedJoinBlock.Target1.Post(DoWork(i))
    Catch e As ArgumentOutOfRangeException
        ' If an error occurred, post the Exception to the 
        ' second target of the block.
        batchedJoinBlock.Target2.Post(e)
    End Try
Next i

' Read the results from the block.
Dim results = batchedJoinBlock.Receive()

' Print the results to the console.

' Print the results.
For Each n As Integer In results.Item1
    Console.WriteLine(n)
Next n
' Print failures.
For Each e As Exception In results.Item2
    Console.WriteLine(e.Message)
Next e

'          Output:
'            5
'            6
'            13
'            55
'            0
'            Specified argument was out of the range of valid values.
'            Specified argument was out of the range of valid values.
'          

Para obter um exemplo completo que usa BatchedJoinBlock<T1,T2> para capturar os resultados e todas as exceções que ocorrem enquanto o programa lê de um banco de dados, confira Explicação passo a passo: usar BatchBlock e BatchedJoinBlock para aumentar a eficiência.

Configurar o comportamento de bloco de fluxo de dados

Você pode habilitar as opções adicionais, fornecendo um objeto System.Threading.Tasks.Dataflow.DataflowBlockOptions ao constructo de tipos de bloco de fluxo de dados. Essas opções controlam comportamentos como o do agendador que gerencia a tarefa subjacente e o grau de paralelismo. O DataflowBlockOptions também tem tipos derivados que especificam comportamentos que são específicos de determinados tipos de bloco de fluxo de dados. A tabela a seguir resume que tipo de opções é associado a cada tipo de bloco de fluxo de dados.

Tipo de bloco de fluxo de dados TipoDataflowBlockOptions
BufferBlock<T> DataflowBlockOptions
BroadcastBlock<T> DataflowBlockOptions
WriteOnceBlock<T> DataflowBlockOptions
ActionBlock<TInput> ExecutionDataflowBlockOptions
TransformBlock<TInput,TOutput> ExecutionDataflowBlockOptions
TransformManyBlock<TInput,TOutput> ExecutionDataflowBlockOptions
BatchBlock<T> GroupingDataflowBlockOptions
JoinBlock<T1,T2> GroupingDataflowBlockOptions
BatchedJoinBlock<T1,T2> GroupingDataflowBlockOptions

As seções a seguir fornecem mais informações sobre os tipos importantes de opções de blocos de fluxo de dados que estão disponíveis por meio das classes System.Threading.Tasks.Dataflow.DataflowBlockOptions, System.Threading.Tasks.Dataflow.ExecutionDataflowBlockOptions e System.Threading.Tasks.Dataflow.GroupingDataflowBlockOptions.

Especificando o Agendador de Tarefas

Cada bloco de fluxo de dados predefinidos usa o mecanismo de agendamento de tarefas TPL para executar atividades como a propagação de dados para um destino, o recebimento de dados de uma origem e a execução de delegados definidos pelo usuário quando os dados tornam-se disponíveis. TaskScheduler é uma classe abstrata que representa um agendador de tarefas que coloca as tarefas na fila, em threads. O agendador de tarefas padrão, Default, usa a classe ThreadPool para colocar o trabalho na fila e executá-lo. Você pode substituir o agendador de tarefas padrão definindo a propriedade TaskScheduler ao construir um objeto de bloco de fluxo de dados.

Quando o mesmo agendador de tarefas gerencia vários blocos de fluxo de dados, ele pode impor políticas entre eles. Por exemplo, se cada um dos vários blocos de fluxo de dados forem configurados para usar como destino o agendador exclusivo do mesmo objeto ConcurrentExclusiveSchedulerPair, todo o trabalho que é executado entre esses blocos será serializado. De modo similar, se esses blocos são configurados para usar como destino o agendador simultâneo do mesmo objeto ConcurrentExclusiveSchedulerPair e esse agendador é configurado para ter um nível de simultaneidade máximo, todo o trabalho desses blocos é limitado a esse número de operações simultâneas. Para obter um exemplo que usa a classe ConcurrentExclusiveSchedulerPair para habilitar a realização de operações de leitura em paralelo mas mantém as operações de gravação ocorrendo de modo exclusivo com relação a todas as outras operações, confira Como especificar um agendador de tarefas em um bloco de fluxo de dados. Para saber mais sobre os agendadores de tarefas na TPL, confira o tópico da classe TaskScheduler.

Especificando o grau de paralelismo

Por padrão, os três tipos de blocos de execução que a biblioteca de fluxo de dados TPL fornece, ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>, processam uma mensagem por vez. Esses tipos de bloco de fluxo de dados também processam mensagens na ordem em que elas são recebidas. Para habilitar esses blocos de fluxo de dados para processar mensagens simultaneamente, defina a propriedade ExecutionDataflowBlockOptions.MaxDegreeOfParallelism ao construir o objeto de bloco de fluxo de dados.

O valor padrão de MaxDegreeOfParallelism é 1 e isso garante que o bloco de fluxo de dados processa uma mensagem por vez. Definir essa propriedade como um valor maior que 1 permite que o bloco de fluxo de dados processe várias mensagens simultaneamente. Definir essa propriedade como DataflowBlockOptions.Unbounded permite que o agendador de tarefas subjacente gerencie o grau máximo de simultaneidade.

Importante

Quando você especifica um grau máximo de paralelismo maior do que 1, várias mensagens são processadas simultaneamente e, portanto, as mensagens talvez não sejam processadas na ordem em que são recebidas. A ordem na qual as mensagens são a saída do bloco é, no entanto, a mesma na qual elas são recebidas.

Como a propriedade MaxDegreeOfParallelism representa o grau máximo de paralelismo, o bloco de fluxo de dados pode executar com um menor grau de paralelismo do que o especificado. O bloco de fluxo de dados pode usar um grau menor de paralelismo para atender aos seus requisitos funcionais ou porque há uma falta de recursos do sistema disponíveis. Um bloco de fluxo de dados nunca escolherá mais paralelismo do que você especificar.

O valor da propriedade MaxDegreeOfParallelism é exclusivo para cada objeto de bloco de fluxo de dados. Por exemplo, se cada um de quatro objetos de bloco de fluxo de dados especificar 1 para o grau máximo de paralelismo, todos esses objetos poderão potencialmente ser executados em paralelo.

Para obter um exemplo que define o grau máximo de paralelismo para permitir que operações demoradas ocorram em paralelo, consulte Como especificar o grau de paralelismo em um bloco de fluxo de dados.

Especificando o número de mensagens por tarefa

Os tipos de bloco de fluxo de dados predefinidos usam tarefas para processar vários elementos de entrada. Isso ajuda a minimizar o número de objetos de tarefa que são necessários para processar dados, o que permite que aplicativos sejam executados com mais eficiência. No entanto, quando as tarefas de um conjunto de blocos de fluxo de dados estão processando dados, talvez seja necessário que as tarefas de outros blocos de fluxo de dados esperem o tempo de processamento colocando mensagens na fila. Para permitir melhor integridade entre tarefas de fluxo de dados, defina a propriedade MaxMessagesPerTask. Quando MaxMessagesPerTask é definido como DataflowBlockOptions.Unbounded, que é o padrão, a tarefa usada por um bloco de fluxo de dados processa as mensagens que estiverem disponíveis. Quando MaxMessagesPerTask é definido com um valor diferente de Unbounded, o bloco de fluxo de dados processa no máximo esse número de mensagens por objeto Task. Embora definir a propriedade MaxMessagesPerTask possa aumentar a honestidade entre tarefas, isso pode fazer com que o sistema crie mais tarefas do que o necessário, o que pode diminuir o desempenho.

Habilitando o cancelamento

A TPL fornece um mecanismo que permite que as tarefas coordenem o cancelamento de forma cooperativa. Para habilitar os blocos de fluxo de dados a participarem desse mecanismo de cancelamento, defina a propriedade CancellationToken. Quando esse objeto CancellationToken é definido como estado cancelado, todos os blocos de fluxo de dados que monitoram esse token concluem a execução do seu item atual, mas não iniciam o processamento de itens subsequentes. Esses blocos de fluxo de dados também limpam quaisquer mensagens armazenadas em buffer, cancelam as conexões com quaisquer blocos de origem e de destino e fazem a transição para o estado cancelado. Ao fazer a transição para o estado cancelado, a propriedade Completion tem a propriedade Status definida como Canceled, a menos que uma exceção ocorra durante o processamento. Nesse caso, Status está definido como Faulted.

Para obter um exemplo que demonstra como usar cancelamento em um aplicativo Windows Forms, consulte Como cancelar um bloco de fluxo de dados. Para obter mais informações sobre cancelamento na TPL, consulte Cancelamento da tarefa.

Especificando comportamento greedy versus não greedy

Vários tipos de bloco de fluxo de dados de agrupamento podem operar em qualquer um dos modos greedy ou não greedy. Por padrão, os tipos de bloco de fluxo de dados predefinidos operam em modo greedy.

Para tipos de bloco de junção como JoinBlock<T1,T2>, o modo greedy significa que o bloco aceita dados imediatamente, mesmo que os dados correspondentes com os quais se pretende realizar a junção não estejam disponíveis. O modo não greedy significa que o bloco adia todas as mensagens de entrada até que uma esteja disponível em cada um de seus destinos para concluir a junção. Se qualquer uma das mensagens adiadas não estiver mais disponível, o bloco de junção liberará todas as mensagens adiadas e reiniciará o processo. Para a classe BatchBlock<T>, o comportamento greedy e não greedy é semelhante, exceto pelo fato de que no modo não greedy, um objeto BatchBlock<T> adia todas as mensagens de entrada até que um número suficiente delas estejam disponíveis de origens distintas para completar um lote.

Para especificar o modo não greedy para um bloco de fluxo de dados, defina Greedy como False. Para obter um exemplo que demonstra como usar o modo não greedy para habilitar vários blocos de junção compartilhar uma fonte de dados com mais eficiência, consulte Como usar JoinBlock para ler dados de várias fontes.

Blocos de fluxo de dados personalizados

Embora a biblioteca de fluxo de dados TPL forneça muitos tipos de bloco predefinidos, você pode criar tipos de bloco adicionais que tenham um comportamento personalizado. Implemente as interfaces ISourceBlock<TOutput> ou ITargetBlock<TInput> diretamente ou use o método Encapsulate para criar um bloco complexo que encapsula o comportamento dos tipos de bloco existentes. Para obter exemplos que mostram como implementar a funcionalidade de bloco de fluxo de dados personalizado, consulte Passo a passo: criando um tipo de bloco de fluxo de dados personalizado.

Título Descrição
Como: gravar mensagens em um bloco de fluxo de dados e ler mensagens dele Demonstra como gravar e ler mensagens de um objeto BufferBlock<T>.
Como: implementar um padrão de fluxo de dados de produtor-consumidor Descreve como usar o modelo de fluxo de dados para implementar um padrão de produtor-consumidor, no qual o produtor envia mensagens para um bloco de fluxo de dados e o consumidor lê as mensagens desse bloco.
Como: executar ações quando um bloco de fluxo de dados recebe dados Descreve como fornecer representantes para os tipos de bloco de fluxo de dados de execução, ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>.
Passo a passo: criar um pipeline de fluxo de dados Descreve como criar um pipeline de fluxo de dados que baixa texto da Web e executa operações nesse texto.
Como: desvincular blocos de fluxo de dados Demonstra como usar o método LinkTo para desvincular um bloco de destino de sua origem depois que a origem oferecer uma mensagem para o destino.
Passo a passo: usar um fluxo de dados em um aplicativo do Windows Forms Demonstra como criar uma rede de blocos de fluxo de dados que executam o processamento de imagens em um Aplicativo do Windows Forms.
Como: cancelar um bloco de fluxo de dados Demonstra como usar cancelamento em um Aplicativo do Windows Forms.
Como: usar JoinBlock para ler dados de várias fontes Explica como usar a classe JoinBlock<T1,T2> para executar uma operação quando os dados estão disponíveis de várias fontes e como usar o modo não greedy para habilitar vários blocos de junção a compartilharem uma fonte de dados com mais eficiência.
Como: especificar o grau de paralelismo em um bloco de fluxo de dados Descreve como definir a propriedade MaxDegreeOfParallelism para habilitar um bloco de fluxo de execução a processar mais de uma mensagem por vez.
Como: especificar um agendador de tarefas em um bloco de fluxo de dados Demonstra como associar um agendador de tarefas específico quando você usa o fluxo de dados em seu aplicativo.
Passo a passo: usando BatchBlock e BatchedJoinBlock para melhorar a eficiência Descreve como usar a classe BatchBlock<T> para aumentar a eficiência das operações de inserção de banco de dados e como usar a classe BatchedJoinBlock<T1,T2> para capturar os resultados e também quaisquer exceções que ocorram enquanto o programa lê de um banco de dados.
Passo a passo: criando um tipo de bloco de fluxo de dados personalizado Demonstra duas maneiras de criar um tipo de bloco de fluxo de dados que implementa o comportamento personalizado.
Biblioteca de tarefas paralelas (TPL) Apresenta a TPL, uma biblioteca que simplifica a programação paralela e simultânea em aplicativos .NET Framework.