操作說明:實作產生者-取用者資料流程模式
在本文中,將說明如何使用 TPL 資料流程程式庫實作生產者-取用者模式。 在此模式中,「生產者」會將訊息傳送至訊息區塊,而「消費者」會從該區塊讀取訊息。
注意
TPL 資料流程程式庫 (System.Threading.Tasks.Dataflow 命名空間) 並未隨 .NET 散發。 若要在 Visual Studio 中安裝 System.Threading.Tasks.Dataflow 命名空間,請開啟您的專案,從 [專案] 功能表中選擇 [管理 NuGet 套件],並於線上搜尋 System.Threading.Tasks.Dataflow
套件。 除此之外也可使用 .Net Core CLI (執行 dotnet add package System.Threading.Tasks.Dataflow
) 加以安裝。
範例
下列範例將示範使用資料流程的基本生產者-取用者模型。 Produce
方法會將包含隨機位元組資料的陣列寫入 System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 物件中,而 Consume
方法會從 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 物件中讀取位元組。 藉由處理 ISourceBlock<TOutput> 和 ITargetBlock<TInput> 介面而不是其衍生類型,您就可以撰寫可重複使用的程式碼來處理各種資料流程區塊類型。 這個範例會使用 BufferBlock<T> 類別。 由於 BufferBlock<T> 類別會同時做為來源區塊和目標區塊,因此生產者和消費者可以使用共用物件來傳輸資料。
Produce
方法會在迴圈中呼叫 Post 方法,將資料同步寫入目標區塊中。 Produce
方法將所有資料寫入目標區塊之後會呼叫 Complete 方法,指出區塊將不會再提供任何額外的資料。 Consume
方法會使用 async 和 await 運算子 (在 Visual Basic 中為 Async 和 Await) 以非同步方式計算從 ISourceBlock<TOutput> 物件接收的位元組總數。 若要以非同步方式執行,Consume
方法會呼叫 OutputAvailableAsync 方法在來源區塊有可用資料,以及來源區塊永遠不再提供其他可用資料時收到通知。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
穩固程式設計
前面的範例會使用單一消費者處理來源資料。 如果您的應用程式中有多個消費者,請使用 TryReceive 方法從來源區塊讀取資料,如下列範例所示。
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
沒有可用資料時,TryReceive 方法會回傳 False
。 如果多個消費者必須同時存取來源區塊,這個機制就能確保在呼叫 OutputAvailableAsync 之後資料仍然可用。