如何:实现生成方-使用方数据流模式
本文介绍如何使用 TPL 数据流库实现生成方-使用方模式。 在此模式下,制造者向消息块发送消息,使用者从该块读取消息。
注意
TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow
包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow
。
示例
下面的示例演示使用数据流的基本制造者-使用者模型。 Produce
方法将包含随机字节数据的数组写入 System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 对象,而 Consume
方法从 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 对象中读取字节。 通过对 ISourceBlock<TOutput> 和 ITargetBlock<TInput> 接口(而非其派生类型)的操作,可以编写可对多种数据流块类型执行的可重用代码。 此示例使用 BufferBlock<T> 类。 由于 BufferBlock<T> 类既充当源块又充当目标块,制造者和使用者可以使用共享对象来传输数据。
Produce
方法在循环中调用 Post 方法,以将数据同步写入目标块。 在 Produce
方法将所有数据写入目标块后,它将调用 Complete 方法来表明此块将不再有其他数据可用。 Consume
方法使用 async 和 await 运算符(Visual Basic 中的 Async 和 Await),异步计算从 ISourceBlock<TOutput> 对象收到的总字节数。 为了异步操作,Consume
方法会调用 OutputAvailableAsync 方法,以便在源块有可用数据和源块不再有其他可用数据时收到通知。
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
可靠编程
前面的示例只使用一个使用者来处理源数据。 如果您的应用程序中有多个使用者,请使用 TryReceive 方法从源块读取数据,如以下示例所示。
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
没有可用数据时,TryReceive 方法将返回 False
。 当多个使用者必须并发访问源块时,此机制可确保在调用 OutputAvailableAsync 后数据仍然可用。