Procedimiento: Implementación de un modelo de flujo de datos productor-consumidor
En este artículo, aprenderá a utilizar la biblioteca de flujos de datos TPL para implementar un modelo productor-consumidor. En este modelo, el productor envía mensajes a un bloque de mensajes y el consumidor lee los mensajes de este bloque.
Nota
La biblioteca de flujos de datos TPL (el espacio de nombres System.Threading.Tasks.Dataflow) no se distribuye con .NET. Para instalar el espacio de nombres System.Threading.Tasks.Dataflow en Visual Studio, abra el proyecto, seleccione Administrar paquetes NuGet en el menú Proyecto y busque en línea el paquete System.Threading.Tasks.Dataflow
. Como alternativa, para realizar la instalación con la CLI de .Net Core, ejecute dotnet add package System.Threading.Tasks.Dataflow
.
Ejemplo
En el ejemplo siguiente se muestra un modelo productor-consumidor básico que usa el flujo de datos. El método Produce
escribe matrices que contienen bytes de datos aleatorios en un objeto System.Threading.Tasks.Dataflow.ITargetBlock<TInput> y el método Consume
lee los bytes de un objeto System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>. Al actuar en las interfaces ISourceBlock<TOutput> y ITargetBlock<TInput>, en lugar de en sus tipos derivados, puede escribir código reutilizable que puede actuar en una variedad de tipos de bloques de flujo de datos. Este ejemplo utiliza la clase BufferBlock<T>. Puesto que la clase BufferBlock<T> actúa como origen y como un bloque de origen y destino, el productor y el consumidor pueden utilizar un objeto compartido para transferir datos.
El método Produce
llama al método Post en un bucle para escribir datos de forma sincrónica en el bloque de destino. Después de que el método Produce
escriba todos los datos en el bloque de destino, llama al método Complete para indicar que el bloque nunca tendrá datos adicionales disponibles. El método Consume
usa los operadores async y await (Async y Await en Visual Basic) para calcular de forma asincrónica el número total de bytes recibidos del objeto ISourceBlock<TOutput>. Para que actúe de forma asincrónica, el método Consume
llama al método OutputAvailableAsync para recibir una notificación cuando el bloque de origen tiene datos disponibles y cuando el bloque de origen nunca va a tener datos adicionales disponibles.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Programación sólida
En el ejemplo anterior se usa un solo consumidor para procesar los datos de origen. Si tiene varios consumidores en la aplicación, use el método TryReceive para leer datos desde el bloque de origen, como se muestra en el siguiente ejemplo.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
El método TryReceive devuelve False
cuando no hay datos disponibles. Cuando varios consumidores deben tener acceso simultáneamente al bloque de origen, este mecanismo garantiza que los datos están disponibles después de la llamada a OutputAvailableAsync.