Postupy: Implementace vzoru toku dat producenta a příjemce
V tomto článku se dozvíte, jak použít knihovnu toku dat TPL k implementaci vzoru producenta a příjemce. V tomto vzoru producent odesílá zprávy do bloku zpráv a příjemce čte zprávy z tohoto bloku.
Poznámka:
Knihovna toku dat TPL ( System.Threading.Tasks.Dataflow obor názvů) není distribuována s .NET. Pokud chcete nainstalovat System.Threading.Tasks.Dataflow obor názvů v sadě Visual Studio, otevřete projekt, v nabídce Projekt zvolte Spravovat balíčky NuGet a vyhledejte System.Threading.Tasks.Dataflow
balíček online. Pokud ho chcete nainstalovat pomocí rozhraní příkazového řádku .NET Core, spusťte dotnet add package System.Threading.Tasks.Dataflow
.
Příklad
Následující příklad ukazuje základní model producent-příjemce, který používá tok dat. Metoda Produce
zapisuje pole obsahující náhodné bajty dat do System.Threading.Tasks.Dataflow.ITargetBlock<TInput> objektu a Consume
metoda čte bajty z objektu System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> . Díky tomu, že místo jejich odvozených typů funguje na ISourceBlock<TOutput>ITargetBlock<TInput> rozhraních, můžete napsat opakovaně použitelný kód, který může fungovat na různých typech bloků toku dat. Tento příklad používá BufferBlock<T> třídu. Vzhledem k tomu, že BufferBlock<T> třída funguje jako zdrojový blok i jako cílový blok, může producent a příjemce k přenosu dat použít sdílený objekt.
Metoda Produce
volá metodu Post ve smyčce pro synchronní zápis dat do cílového bloku. Jakmile Produce
metoda zapíše všechna data do cílového bloku, zavolá metodu Complete , která označuje, že blok nebude mít nikdy k dispozici další data. Metoda používá asynchronní operátory a operátory await (Async a Await v jazyce Visual Basic) k asynchronnímu výpočtu celkového počtu bajtů přijatých z objektuISourceBlock<TOutput>.Consume
Aby fungovala asynchronně, Consume
metoda volá metodu OutputAvailableAsync , aby obdržela oznámení, když zdrojový blok obsahuje data k dispozici a kdy zdrojový blok nikdy nebude mít k dispozici další data.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Robustní programování
Předchozí příklad používá ke zpracování zdrojových dat pouze jednoho příjemce. Pokud máte ve své aplikaci více příjemců, použijte metodu TryReceive ke čtení dat ze zdrojového bloku, jak je znázorněno v následujícím příkladu.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Metoda TryReceive vrátí False
, pokud nejsou k dispozici žádná data. Pokud více příjemců musí přistupovat ke zdrojovému bloku souběžně, tento mechanismus zaručuje, že data jsou stále k dispozici po volání OutputAvailableAsync.