Instrukcje: implementowanie wzorca przepływu danych producenta-konsumenta
W tym artykule dowiesz się, jak za pomocą biblioteki przepływu danych TPL zaimplementować wzorzec producenta-konsumenta. W tym wzorcu producent wysyła komunikaty do bloku komunikatów, a użytkownik odczytuje komunikaty z tego bloku.
Uwaga
Biblioteka przepływów danych TPL ( System.Threading.Tasks.Dataflow przestrzeń nazw) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować System.Threading.Tasks.Dataflow przestrzeń nazw w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj pakiet w trybie online System.Threading.Tasks.Dataflow
. Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow
.
Przykład
W poniższym przykładzie pokazano podstawowy model producent-konsument korzystający z przepływu danych. Metoda Produce
zapisuje tablice zawierające losowe bajty danych do System.Threading.Tasks.Dataflow.ITargetBlock<TInput> obiektu, a Consume
metoda odczytuje bajty z System.Threading.Tasks.Dataflow.ISourceBlock<TOutput> obiektu. Działając na ISourceBlock<TOutput> interfejsach i ITargetBlock<TInput> , zamiast ich typów pochodnych, można napisać kod wielokrotnego użytku, który może działać na różnych typach bloków przepływu danych. W tym przykładzie użyto BufferBlock<T> klasy . BufferBlock<T> Ponieważ klasa działa zarówno jako blok źródłowy, jak i jako blok docelowy, producent i odbiorca mogą używać obiektu udostępnionego do transferu danych.
Metoda Produce
wywołuje metodę Post w pętli, aby synchronicznie zapisywać dane w bloku docelowym. Produce
Gdy metoda zapisuje wszystkie dane w bloku docelowym, wywołuje Complete metodę , aby wskazać, że blok nigdy nie będzie miał dostępnych dodatkowych danych. Metoda Consume
używa operatorów asynchronicznych i await (Async i Await w Visual Basic), aby asynchronicznie obliczyć całkowitą liczbę bajtów odebranych z ISourceBlock<TOutput> obiektu. Aby działać asynchronicznie, metoda wywołuje OutputAvailableAsync metodę w celu odbierania powiadomienia, Consume
gdy blok źródłowy ma dostępne dane, a kiedy blok źródłowy nigdy nie będzie miał dodatkowych danych.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class DataflowProducerConsumer
{
static void Produce(ITargetBlock<byte[]> target)
{
var rand = new Random();
for (int i = 0; i < 100; ++ i)
{
var buffer = new byte[1024];
rand.NextBytes(buffer);
target.Post(buffer);
}
target.Complete();
}
static async Task<int> ConsumeAsync(ISourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
byte[] data = await source.ReceiveAsync();
bytesProcessed += data.Length;
}
return bytesProcessed;
}
static async Task Main()
{
var buffer = new BufferBlock<byte[]>();
var consumerTask = ConsumeAsync(buffer);
Produce(buffer);
var bytesProcessed = await consumerTask;
Console.WriteLine($"Processed {bytesProcessed:#,#} bytes.");
}
}
// Sample output:
// Processed 102,400 bytes.
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
Friend Class DataflowProducerConsumer
Private Shared Sub Produce(ByVal target As ITargetBlock(Of Byte()))
Dim rand As New Random()
For i As Integer = 0 To 99
Dim buffer(1023) As Byte
rand.NextBytes(buffer)
target.Post(buffer)
Next i
target.Complete()
End Sub
Private Shared Async Function ConsumeAsync(
ByVal source As ISourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte = Await source.ReceiveAsync()
bytesProcessed += data.Length
Loop
Return bytesProcessed
End Function
Shared Sub Main()
Dim buffer = New BufferBlock(Of Byte())()
Dim consumer = ConsumeAsync(buffer)
Produce(buffer)
Dim result = consumer.GetAwaiter().GetResult()
Console.WriteLine($"Processed {result:#,#} bytes.")
End Sub
End Class
' Sample output:
' Processed 102,400 bytes.
Niezawodne programowanie
W poprzednim przykładzie użyto tylko jednego konsumenta do przetwarzania danych źródłowych. Jeśli masz wielu użytkowników w aplikacji, użyj TryReceive metody , aby odczytać dane z bloku źródłowego, jak pokazano w poniższym przykładzie.
static async Task<int> ConsumeAsync(IReceivableSourceBlock<byte[]> source)
{
int bytesProcessed = 0;
while (await source.OutputAvailableAsync())
{
while (source.TryReceive(out byte[] data))
{
bytesProcessed += data.Length;
}
}
return bytesProcessed;
}
Private Shared Async Function ConsumeAsync(
ByVal source As IReceivableSourceBlock(Of Byte())) As Task(Of Integer)
Dim bytesProcessed As Integer = 0
Do While Await source.OutputAvailableAsync()
Dim data() As Byte
Do While source.TryReceive(data)
bytesProcessed += data.Length
Loop
Loop
Return bytesProcessed
End Function
Metoda TryReceive zwraca False
wartość, gdy żadne dane nie są dostępne. Gdy wielu użytkowników musi jednocześnie uzyskać dostęp do bloku źródłowego, ten mechanizm gwarantuje, że dane są nadal dostępne po wywołaniu metody OutputAvailableAsync.