Compartilhar via


Como: implementar um padrão de fluxo de dados de produtor-consumidor

Neste artigo, você aprenderá como usar a biblioteca de fluxo de dados TPL para implementar um padrão de produtor-consumidor. Nesse padrão, o produtor envia mensagens a um bloco de mensagens e o consumidor lê mensagens nesse bloco.

Observação

A Biblioteca de Fluxo de Dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com o .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra o projeto, escolha Gerenciar Pacotes NuGet no menu Projeto e pesquise online o pacote System.Threading.Tasks.Dataflow. Como alternativa, instale-o usando a CLI do .NET Core e execute dotnet add package System.Threading.Tasks.Dataflow.

Exemplo

O exemplo a seguir demonstra um modelo básico de produtor-consumidor que usa o fluxo de dados. O método Produce grava matrizes que contêm bytes de dados aleatórios em um objeto System.Threading.Tasks.Dataflow.ITargetBlock<TInput> e o método Consume lê bytes de um objeto System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Ao agir nas interfaces ISourceBlock<TOutput> e ITargetBlock<TInput>, em vez de seus tipos derivados, você pode gravar o código reutilizável que pode agir em uma variedade de tipos de bloco de fluxo de dados. Este exemplo usa a classe BufferBlock<T>. Como a classe BufferBlock<T> atua como um bloco de origem e um bloco de destino, o produtor e consumidor podem usar um objeto compartilhado para transferir dados.

O método Produce chama o método Post em um loop para gravar dados de forma síncrona no bloco de destino. Após o método Produce gravar todos os dados no bloco de destino, ele chama o método Complete para indicar que o bloco nunca terá dados adicionais disponíveis. O método Consume usa os operadores async e await (Async e Await em Visual Basic) para calcular de forma assíncrona o número total de bytes que são recebidos do objeto ISourceBlock<TOutput>. Para agir de forma assíncrona, o método Consume chama o método OutputAvailableAsync para receber uma notificação quando o bloco de origem tiver dados disponíveis e o bloco de origem nunca terá outros dados disponíveis.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Programação robusta

O exemplo anterior usa apenas um consumidor para processar os dados de origem. Se você tiver vários consumidores em seu aplicativo, use o método TryReceive para ler dados do bloco de origem, conforme mostrado no exemplo a seguir.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

O método TryReceive retorna False quando não há dados disponíveis. Quando vários consumidores têm que acessar simultaneamente o bloco de origem, esse mecanismo garante que os dados ainda estarão disponíveis após a chamada para o OutputAvailableAsync.

Confira também