Como: implementar um padrão de fluxo de dados de produtor-consumidor
Neste artigo, você aprenderá como usar a biblioteca de fluxo de dados TPL para implementar um padrão de produtor-consumidor. Nesse padrão, o produtor envia mensagens a um bloco de mensagens e o consumidor lê mensagens nesse bloco.
Observação
A Biblioteca de Fluxo de Dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com o .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra o projeto, escolha Gerenciar Pacotes NuGet no menu Projeto e pesquise online o pacote System.Threading.Tasks.Dataflow
. Como alternativa, instale-o usando a CLI do .NET Core e execute dotnet add package System.Threading.Tasks.Dataflow
.
Exemplo
O exemplo a seguir demonstra um modelo básico de produtor-consumidor que usa o fluxo de dados. O método Produce
grava matrizes que contêm bytes de dados aleatórios em um objeto System.Threading.Tasks.Dataflow.ITargetBlock<TInput> e o método Consume
lê bytes de um objeto System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Ao agir nas interfaces ISourceBlock<TOutput> e ITargetBlock<TInput>, em vez de seus tipos derivados, você pode gravar o código reutilizável que pode agir em uma variedade de tipos de bloco de fluxo de dados. Este exemplo usa a classe BufferBlock<T>. Como a classe BufferBlock<T> atua como um bloco de origem e um bloco de destino, o produtor e consumidor podem usar um objeto compartilhado para transferir dados.
O método Produce
chama o método Post em um loop para gravar dados de forma síncrona no bloco de destino. Após o método Produce
gravar todos os dados no bloco de destino, ele chama o método Complete para indicar que o bloco nunca terá dados adicionais disponíveis. O método Consume
usa os operadores async e await (Async e Await em Visual Basic) para calcular de forma assíncrona o número total de bytes que são recebidos do objeto ISourceBlock<TOutput>. Para agir de forma assíncrona, o método Consume
chama o método OutputAvailableAsync para receber uma notificação quando o bloco de origem tiver dados disponíveis e o bloco de origem nunca terá outros dados disponíveis.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Programação robusta
O exemplo anterior usa apenas um consumidor para processar os dados de origem. Se você tiver vários consumidores em seu aplicativo, use o método TryReceive para ler dados do bloco de origem, conforme mostrado no exemplo a seguir.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
O método TryReceive retorna False
quando não há dados disponíveis. Quando vários consumidores têm que acessar simultaneamente o bloco de origem, esse mecanismo garante que os dados ainda estarão disponíveis após a chamada para o OutputAvailableAsync.