共用方式為


操作說明:實作產生者-取用者資料流程模式

在本文中,將說明如何使用 TPL 資料流程程式庫實作生產者-取用者模式。 在此模式中,「生產者」會將訊息傳送至訊息區塊,而「消費者」會從該區塊讀取訊息。

注意

TPL 資料流程程式庫 (System.Threading.Tasks.Dataflow 命名空間) 並未隨 .NET 散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表中選擇 [管理 NuGet 套件],並於線上搜尋 System.Threading.Tasks.Dataflow 套件。 除此之外也可使用 .Net Core CLI (執行 dotnet add package System.Threading.Tasks.Dataflow) 加以安裝。

範例

下列範例將示範使用資料流程的基本生產者-取用者模型。 Produce 方法會將包含隨機位元組資料的陣列寫入 System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 物件中,而 Consume 方法會從 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 物件中讀取位元組。 藉由處理 ISourceBlock<TOutput>ITargetBlock<TInput> 介面而不是其衍生類型,您就可以撰寫可重複使用的程式碼來處理各種資料流程區塊類型。 這個範例會使用 BufferBlock<T> 類別。 由於 BufferBlock<T> 類別會同時做為來源區塊和目標區塊,因此生產者和消費者可以使用共用物件來傳輸資料。

Produce 方法會在迴圈中呼叫 Post 方法,將資料同步寫入目標區塊中。 Produce 方法將所有資料寫入目標區塊之後會呼叫 Complete 方法,指出區塊將不會再提供任何額外的資料。 Consume 方法會使用 asyncawait 運算子 (在 Visual Basic 中為 AsyncAwait) 以非同步方式計算從 ISourceBlock<TOutput> 物件接收的位元組總數。 若要以非同步方式執行,Consume 方法會呼叫 OutputAvailableAsync 方法在來源區塊有可用資料,以及來源區塊永遠不再提供其他可用資料時收到通知。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

穩固程式設計

前面的範例會使用單一消費者處理來源資料。 如果您的應用程式中有多個消費者,請使用 TryReceive 方法從來源區塊讀取資料,如下列範例所示。

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

沒有可用資料時,TryReceive 方法會回傳 False。 如果多個消費者必須同時存取來源區塊,這個機制就能確保在呼叫 OutputAvailableAsync 之後資料仍然可用。

另請參閱