Freigeben über


Vorgehensweise: Implementieren eines Producer-Consumer-Musters

In diesem Artikel wird beschrieben, wie die TPL-Dataflowbibliothek verwendet wird, um ein Producer-Consumer-Muster zu implementieren. Bei diesem Muster sendet der Producer Nachrichten an einen Nachrichtenblock, während der Consumer Nachrichten aus diesem Block ausliest.

Hinweis

Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow ausführen.

Beispiel

Das folgende Beispiel zeigt ein grundlegendes Producer-Consumer-Modell, das Datenfluss verwendet. Die Produce-Methode schreibt Arrays, die zufällige Datenbytes enthalten, in ein System.Threading.Tasks.Dataflow.ITargetBlock<TInput>-Objekt, und die Consume-Methode liest Bytes aus einem System.Threading.Tasks.Dataflow.ISourceBlock<TOutput>-Objekt. Da Sie sich mit den Schnittstellen ISourceBlock<TOutput> und ITargetBlock<TInput> befassen, anstatt mit ihren abgeleiteten Typen, können Sie wiederverwendbaren Code schreiben und diesen auf eine Vielzahl von Datenflussblock-Typen anwenden. In diesem Beispiel wird die BufferBlock<T>-Klasse verwendet. Da die BufferBlock<T>-Klasse sowohl als Quell- als auch als Zielblock fungiert, können der Producer und der Consumer ein freigegebenes Objekt zum Übertragen von Daten verwenden.

Die Produce-Methode ruft die Post-Methode in einer Schleife auf, um die Daten synchron in den Zielblock zu schreiben. Nachdem die Produce-Methode alle Daten in den Zielblock geschrieben hat, ruft sie die Complete-Methode auf, um anzugeben, dass der Block niemals über zusätzliche Daten verfügen wird. Die Consume-Methode verwendet die Operatoren async und await (Async und Await in Visual Basic) zur asynchronen Berechnung der Gesamtzahl von Bytes, die vom ISourceBlock<TOutput>-Objekt empfangen werden. Für die Asynchronität ruft die Consume-Methode die OutputAvailableAsync-Methode auf, um eine Benachrichtigung zu erhalten, wenn der Quellblock über Daten verfügt und wenn dem Quellblock niemals zusätzliche Daten zur Verfügung stehen werden.

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

Stabile Programmierung

Im obigen Beispiel wird nur ein Consumer verwendet, um die Quelldaten zu verarbeiten. Wenn Sie in Ihrer Anwendung über mehrere Consumer verfügen, verwenden Sie die TryReceive-Methode, um wie im folgenden Beispiel Daten aus dem Quellblock zu lesen.

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

Die TryReceive-Methode gibt False zurück, wenn keine Daten verfügbar sind. Wenn mehrere Consumer gleichzeitig auf den Quellblock zugreifen müssen, gewährleistet dieser Mechanismus, dass Daten nach dem Aufruf von OutputAvailableAsync weiterhin verfügbar sind.

Siehe auch