Delen via


Procedure: Een gegevensstroompatroon voor producenten en consumenten implementeren

In dit artikel leert u hoe u de TPL-gegevensstroombibliotheek gebruikt om een producer-consumerpatroon te implementeren. In dit patroon verzendt de producent berichten naar een berichtblok en leest de consument berichten van dat blok.

Notitie

De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow pakket. U kunt het ook installeren met behulp van de .NET Core CLI.dotnet add package System.Threading.Tasks.Dataflow

Opmerking

In het volgende voorbeeld ziet u een eenvoudig model voor producenten en consumenten dat gebruikmaakt van een gegevensstroom. De Produce methode schrijft matrices die willekeurige bytes aan gegevens naar een System.Threading.Tasks.Dataflow.ITargetBlock<TInput> object bevatten en de Consume methode leest bytes uit een System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> object. Door te handelen op de ISourceBlock<TOutput> en ITargetBlock<TInput> interfaces, in plaats van hun afgeleide typen, kunt u herbruikbare code schrijven die kan reageren op verschillende typen gegevensstroomblokken. In dit voorbeeld wordt de BufferBlock<T> klasse gebruikt. Omdat de BufferBlock<T> klasse fungeert als een bronblok en als doelblok, kunnen de producent en de consument een gedeeld object gebruiken om gegevens over te dragen.

Met Produce de methode wordt de Post methode in een lus aangeroepen om synchroon gegevens naar het doelblok te schrijven. Nadat de Produce methode alle gegevens naar het doelblok heeft geschreven, wordt de Complete methode aangeroepen om aan te geven dat er nooit extra gegevens beschikbaar zijn voor het blok. De Consume methode maakt gebruik van de asynchrone operators (Async and Await in Visual Basic) om het totale aantal bytes dat van het ISourceBlock<TOutput> object wordt ontvangen, asynchroon te berekenen. Om asynchroon te handelen, roept de Consume methode de OutputAvailableAsync methode aan om een melding te ontvangen wanneer het bronblok gegevens beschikbaar heeft en wanneer het bronblok nooit extra gegevens beschikbaar heeft.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Robuuste programmering

In het voorgaande voorbeeld wordt slechts één consument gebruikt om de brongegevens te verwerken. Als u meerdere consumenten in uw toepassing hebt, gebruikt u de TryReceive methode om gegevens uit het bronblok te lezen, zoals wordt weergegeven in het volgende voorbeeld.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

De TryReceive methode retourneert False wanneer er geen gegevens beschikbaar zijn. Wanneer meerdere consumenten gelijktijdig toegang moeten krijgen tot het bronblok, garandeert dit mechanisme dat gegevens nog steeds beschikbaar zijn na de aanroep naar OutputAvailableAsync.

Zie ook