Porady: Korzystanie z klasy JoinBlock do odczytywania danych z wielu źródeł
W tym dokumencie wyjaśniono, jak używać JoinBlock<T1,T2> klasy do wykonywania operacji, gdy dane są dostępne z wielu źródeł. Pokazuje również, jak używać trybu niechłannego, aby umożliwić wydajniejsze udostępnianie źródła danych wielu bloków sprzężenia.
Uwaga
Biblioteka przepływów danych TPL ( System.Threading.Tasks.Dataflow przestrzeń nazw) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować System.Threading.Tasks.Dataflow przestrzeń nazw w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj pakiet w trybie online System.Threading.Tasks.Dataflow
. Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow
.
Przykład
Poniższy przykład definiuje trzy typy zasobów, NetworkResource
, FileResource
i MemoryResource
, i wykonuje operacje, gdy zasoby staną się dostępne. W tym przykładzie wymagana jest para i MemoryResource
, aby wykonać pierwszą NetworkResource
operację i FileResource
MemoryResource
parę, aby wykonać drugą operację. Aby umożliwić wykonywanie tych operacji, gdy są dostępne wszystkie wymagane zasoby, w tym przykładzie użyto JoinBlock<T1,T2> klasy . JoinBlock<T1,T2> Gdy obiekt odbiera dane ze wszystkich źródeł, propaguje te dane do obiektu docelowego, który w tym przykładzie jest obiektemActionBlock<TInput>. Oba JoinBlock<T1,T2> obiekty odczytane z udostępnionej MemoryResource
puli obiektów.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
// Represents a resource. A derived class might represent
// a limited resource such as a memory, network, or I/O
// device.
abstract class Resource
{
}
// Represents a memory resource. For brevity, the details of
// this class are omitted.
class MemoryResource : Resource
{
}
// Represents a network resource. For brevity, the details of
// this class are omitted.
class NetworkResource : Resource
{
}
// Represents a file resource. For brevity, the details of
// this class are omitted.
class FileResource : Resource
{
}
static void Main(string[] args)
{
// Create three BufferBlock<T> objects. Each object holds a different
// type of resource.
var networkResources = new BufferBlock<NetworkResource>();
var fileResources = new BufferBlock<FileResource>();
var memoryResources = new BufferBlock<MemoryResource>();
// Create two non-greedy JoinBlock<T1, T2> objects.
// The first join works with network and memory resources;
// the second pool works with file and memory resources.
var joinNetworkAndMemoryResources =
new JoinBlock<NetworkResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
var joinFileAndMemoryResources =
new JoinBlock<FileResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
// Create two ActionBlock<T> objects.
// The first block acts on a network resource and a memory resource.
// The second block acts on a file resource and a memory resource.
var networkMemoryAction =
new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("Network worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("Network worker: finished using resources...");
// Release the resources back to their respective pools.
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
var fileMemoryAction =
new ActionBlock<Tuple<FileResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("File worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("File worker: finished using resources...");
// Release the resources back to their respective pools.
fileResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
// Link the resource pools to the JoinBlock<T1, T2> objects.
// Because these join blocks operate in non-greedy mode, they do not
// take the resource from a pool until all resources are available from
// all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);
fileResources.LinkTo(joinFileAndMemoryResources.Target1);
memoryResources.LinkTo(joinFileAndMemoryResources.Target2);
// Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
joinFileAndMemoryResources.LinkTo(fileMemoryAction);
// Populate the resource pools. In this example, network and
// file resources are more abundant than memory resources.
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
memoryResources.Post(new MemoryResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
// Allow data to flow through the network for several seconds.
Thread.Sleep(10000);
}
}
/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
' Represents a resource. A derived class might represent
' a limited resource such as a memory, network, or I/O
' device.
Private MustInherit Class Resource
End Class
' Represents a memory resource. For brevity, the details of
' this class are omitted.
Private Class MemoryResource
Inherits Resource
End Class
' Represents a network resource. For brevity, the details of
' this class are omitted.
Private Class NetworkResource
Inherits Resource
End Class
' Represents a file resource. For brevity, the details of
' this class are omitted.
Private Class FileResource
Inherits Resource
End Class
Shared Sub Main(ByVal args() As String)
' Create three BufferBlock<T> objects. Each object holds a different
' type of resource.
Dim networkResources = New BufferBlock(Of NetworkResource)()
Dim fileResources = New BufferBlock(Of FileResource)()
Dim memoryResources = New BufferBlock(Of MemoryResource)()
' Create two non-greedy JoinBlock<T1, T2> objects.
' The first join works with network and memory resources;
' the second pool works with file and memory resources.
Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
' Create two ActionBlock<T> objects.
' The first block acts on a network resource and a memory resource.
' The second block acts on a file resource and a memory resource.
Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("Network worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("Network worker: finished using resources...")
networkResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("File worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("File worker: finished using resources...")
fileResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
' Link the resource pools to the JoinBlock<T1, T2> objects.
' Because these join blocks operate in non-greedy mode, they do not
' take the resource from a pool until all resources are available from
' all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)
fileResources.LinkTo(joinFileAndMemoryResources.Target1)
memoryResources.LinkTo(joinFileAndMemoryResources.Target2)
' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
joinFileAndMemoryResources.LinkTo(fileMemoryAction)
' Populate the resource pools. In this example, network and
' file resources are more abundant than memory resources.
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
memoryResources.Post(New MemoryResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
' Allow data to flow through the network for several seconds.
Thread.Sleep(10000)
End Sub
End Class
' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'
Aby umożliwić wydajne korzystanie z udostępnionej MemoryResource
puli obiektów, w tym przykładzie określono GroupingDataflowBlockOptions obiekt, który ma Greedy właściwość ustawioną na False
JoinBlock<T1,T2> tworzenie obiektów, które działają w trybie niechłannym. Blok sprzężenia niechłannego odroczy wszystkie komunikaty przychodzące, dopóki nie będzie dostępny z każdego źródła. Jeśli którykolwiek z odroczonych komunikatów został zaakceptowany przez inny blok, blok sprzężenia ponownie uruchomi proces. Tryb niechłanny umożliwia bloki sprzężenia, które współdzielą jeden lub więcej bloków źródłowych, aby kontynuować postęp, ponieważ inne bloki czekają na dane. W tym przykładzie, jeśli MemoryResource
obiekt zostanie dodany do memoryResources
puli, pierwszy blok sprzężenia, który otrzyma drugie źródło danych, może poczynić postęp. Jeśli w tym przykładzie użyto trybu chciwości, czyli domyślnego, jeden blok sprzężenia może przyjąć MemoryResource
obiekt i poczekać na udostępnienie drugiego zasobu. Jeśli jednak drugi blok sprzężenia ma dostępne drugie źródło danych, nie może wykonać postępu, ponieważ MemoryResource
obiekt został pobrany przez inny blok sprzężenia.
Niezawodne programowanie
Korzystanie z niechłannych sprzężeń może również pomóc w zapobieganiu zakleszczaniu w aplikacji. W aplikacji programowej zakleszczenie występuje, gdy co najmniej dwa procesy przechowują zasób i wzajemnie oczekują na wydanie innego zasobu. Rozważmy aplikację, która definiuje dwa JoinBlock<T1,T2> obiekty. Oba obiekty odczytują dane z dwóch udostępnionych bloków źródłowych. W trybie chciwości, jeśli jeden blok sprzężenia odczytuje z pierwszego źródła, a drugi blok sprzężenia odczytuje się z drugiego źródła, aplikacja może zakleszczeć, ponieważ oba bloki sprzężenia wzajemnie czekają, aż drugi zwolni jego zasób. W trybie niechłannym każdy blok sprzężenia odczytuje ze swoich źródeł tylko wtedy, gdy wszystkie dane są dostępne, a zatem ryzyko zakleszczenia zostanie wyeliminowane.