如何:实现生成方-使用方数据流模式

本文介绍如何使用 TPL 数据流库实现生成方-使用方模式。 在此模式下,制造者向消息块发送消息,使用者从该块读取消息。

注意

TPL 数据流库(System.Threading.Tasks.Dataflow 命名空间)不随 .NET 一起分发。 若要在 Visual Studio 中安装 System.Threading.Tasks.Dataflow 命名空间,请打开项目,选择“项目”菜单中的“管理 NuGet 包”,再在线搜索 System.Threading.Tasks.Dataflow 包。 或者,若要使用 .NET Core CLI 进行安装,请运行 dotnet add package System.Threading.Tasks.Dataflow

示例

下面的示例演示使用数据流的基本制造者-使用者模型。 Produce 方法将包含随机字节数据的数组写入 System.Threading.Tasks.Dataflow.ITargetBlock<TInput> 对象,而 Consume 方法从 System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> 对象中读取字节。 通过对 ISourceBlock<TOutput>ITargetBlock<TInput> 接口(而非其派生类型)的操作,可以编写可对多种数据流块类型执行的可重用代码。 此示例使用 BufferBlock<T> 类。 由于 BufferBlock<T> 类既充当源块又充当目标块,制造者和使用者可以使用共享对象来传输数据。

Produce 方法在循环中调用 Post 方法,以将数据同步写入目标块。 在 Produce 方法将所有数据写入目标块后,它将调用 Complete 方法来表明此块将不再有其他数据可用。 Consume 方法使用 asyncawait 运算符(Visual Basic 中的 AsyncAwait),异步计算从 ISourceBlock<TOutput> 对象收到的总字节数。 为了异步操作,Consume 方法会调用 OutputAvailableAsync 方法,以便在源块有可用数据和源块不再有其他可用数据时收到通知。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class DataflowProducerConsumer
{
    static void Produce(ITargetBlock<byte[]> target)
    {
        var rand = new Random();

        for (int i = 0; i < 100; ++ i)
        {
            var buffer = new byte[1024];
            rand.NextBytes(buffer);
            target.Post(buffer);
        }

        target.Complete();
    }

    static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
    {
        int bytesProcessed = 0;

        while (await source.OutputAvailableAsync())
        {
            byte[] data = await source.ReceiveAsync();
            bytesProcessed += data.Length;
        }

        return bytesProcessed;
    }

    static async Task Main()
    {
        var buffer = new BufferBlock<byte[]>();
        var consumerTask = ConsumeAsync(buffer);
        Produce(buffer);

        var bytesProcessed = await consumerTask;

        Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
    }
}

// Sample  output:
//     Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow

Friend Class DataflowProducerConsumer
    Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
        Dim rand As New Random()

        For i As Integer = 0 To 99
            Dim buffer(1023) As Byte
            rand.NextBytes(buffer)
            target.Post(buffer)
        Next i

        target.Complete()
    End Sub

    Private Shared Async Function ConsumeAsync(
        ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
        Dim bytesProcessed As Integer = 0

        Do While Await source.OutputAvailableAsync()
            Dim data() As Byte = Await source.ReceiveAsync()
            bytesProcessed += data.Length
        Loop

        Return bytesProcessed
    End Function

    Shared Sub Main()
        Dim buffer = New BufferBlock(Of Byte())()
        Dim consumer = ConsumeAsync(buffer)
        Produce(buffer)

        Dim result = consumer.GetAwaiter().GetResult()

        Console.WriteLine($"Processed {result:#,#} bytes.")
    End Sub
End Class

' Sample output:
'     Processed 102,400 bytes.

可靠编程

前面的示例只使用一个使用者来处理源数据。 如果您的应用程序中有多个使用者,请使用 TryReceive 方法从源块读取数据,如以下示例所示。

static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
    int bytesProcessed = 0;
    while (await source.OutputAvailableAsync())
    {
        while (source.TryReceive(out byte[] data))
        {
            bytesProcessed += data.Length;
        }
    }
    return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
    ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
    Dim bytesProcessed As Integer = 0
    
    Do While Await source.OutputAvailableAsync()
        Dim data() As Byte
        Do While source.TryReceive(data)
            bytesProcessed += data.Length
        Loop
    Loop

    Return bytesProcessed
End Function

没有可用数据时,TryReceive 方法将返回 False。 当多个使用者必须并发访问源块时,此机制可确保在调用 OutputAvailableAsync 后数据仍然可用。

请参阅