Procedure: Een gegevensstroompatroon voor producenten en consumenten implementeren
In dit artikel leert u hoe u de TPL-gegevensstroombibliotheek gebruikt om een producer-consumerpatroon te implementeren. In dit patroon verzendt de producent berichten naar een berichtblok en leest de consument berichten van dat blok.
Notitie
De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow
pakket. U kunt het ook installeren met behulp van de .NET Core CLI.dotnet add package System.Threading.Tasks.Dataflow
Opmerking
In het volgende voorbeeld ziet u een eenvoudig model voor producenten en consumenten dat gebruikmaakt van een gegevensstroom. De Produce
methode schrijft matrices die willekeurige bytes aan gegevens naar een System.Threading.Tasks.Dataflow.ITargetBlock<TInput> object bevatten en de Consume
methode leest bytes uit een System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> object. Door te handelen op de ISourceBlock<TOutput> en ITargetBlock<TInput> interfaces, in plaats van hun afgeleide typen, kunt u herbruikbare code schrijven die kan reageren op verschillende typen gegevensstroomblokken. In dit voorbeeld wordt de BufferBlock<T> klasse gebruikt. Omdat de BufferBlock<T> klasse fungeert als een bronblok en als doelblok, kunnen de producent en de consument een gedeeld object gebruiken om gegevens over te dragen.
Met Produce
de methode wordt de Post methode in een lus aangeroepen om synchroon gegevens naar het doelblok te schrijven. Nadat de Produce
methode alle gegevens naar het doelblok heeft geschreven, wordt de Complete methode aangeroepen om aan te geven dat er nooit extra gegevens beschikbaar zijn voor het blok. De Consume
methode maakt gebruik van de asynchrone operators (Async and Await in Visual Basic) om het totale aantal bytes dat van het ISourceBlock<TOutput> object wordt ontvangen, asynchroon te berekenen. Om asynchroon te handelen, roept de Consume
methode de OutputAvailableAsync methode aan om een melding te ontvangen wanneer het bronblok gegevens beschikbaar heeft en wanneer het bronblok nooit extra gegevens beschikbaar heeft.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Robuuste programmering
In het voorgaande voorbeeld wordt slechts één consument gebruikt om de brongegevens te verwerken. Als u meerdere consumenten in uw toepassing hebt, gebruikt u de TryReceive methode om gegevens uit het bronblok te lezen, zoals wordt weergegeven in het volgende voorbeeld.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
De TryReceive methode retourneert False
wanneer er geen gegevens beschikbaar zijn. Wanneer meerdere consumenten gelijktijdig toegang moeten krijgen tot het bronblok, garandeert dit mechanisme dat gegevens nog steeds beschikbaar zijn na de aanroep naar OutputAvailableAsync.