Vorgehensweise: Verwenden von JoinBlock zum Lesen aus mehreren Quellen
In diesem Dokument erfahren Sie, wie Sie mithilfe der JoinBlock<T1,T2>-Klasse einen Vorgang ausführen, wenn Daten aus mehreren Quellen verfügbar sind. Zudem wird der nicht gierige Modus veranschaulicht, durch den mehrere Gruppierungsblöcke eine Datenquelle effizienter gemeinsam verwenden können.
Hinweis
Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow
-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow
ausführen.
Beispiel
Im folgenden Beispiel werden drei Ressourcentypen (NetworkResource
, FileResource
und MemoryResource
) definiert und Vorgänge ausführt, sobald Ressourcen verfügbar werden. Für dieses Bespiel wird ein Paar aus NetworkResource
und MemoryResource
benötigt, um den ersten Vorgang auszuführen, und ein Paar aus FileResource
und MemoryResource
, um den zweiten Vorgang auszuführen. Damit diese Vorgänge ausgeführt werden, wenn alle erforderlichen Ressourcen verfügbar sind, wird in diesem Beispiel die JoinBlock<T1,T2>-Klasse verwendet. Wenn ein JoinBlock<T1,T2>-Objekt Daten aus allen Ressourcen empfängt, gibt es diese an das Ziel weiter, d.h. in diesem Beispiel an das ActionBlock<TInput>-Objekt. Beide JoinBlock<T1,T2>-Objekte lesen aus dem freigegebenen Pool von MemoryResource
-Objekten.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
// Represents a resource. A derived class might represent
// a limited resource such as a memory, network, or I/O
// device.
abstract class Resource
{
}
// Represents a memory resource. For brevity, the details of
// this class are omitted.
class MemoryResource : Resource
{
}
// Represents a network resource. For brevity, the details of
// this class are omitted.
class NetworkResource : Resource
{
}
// Represents a file resource. For brevity, the details of
// this class are omitted.
class FileResource : Resource
{
}
static void Main(string[] args)
{
// Create three BufferBlock<T> objects. Each object holds a different
// type of resource.
var networkResources = new BufferBlock<NetworkResource>();
var fileResources = new BufferBlock<FileResource>();
var memoryResources = new BufferBlock<MemoryResource>();
// Create two non-greedy JoinBlock<T1, T2> objects.
// The first join works with network and memory resources;
// the second pool works with file and memory resources.
var joinNetworkAndMemoryResources =
new JoinBlock<NetworkResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
var joinFileAndMemoryResources =
new JoinBlock<FileResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false
});
// Create two ActionBlock<T> objects.
// The first block acts on a network resource and a memory resource.
// The second block acts on a file resource and a memory resource.
var networkMemoryAction =
new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("Network worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("Network worker: finished using resources...");
// Release the resources back to their respective pools.
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
var fileMemoryAction =
new ActionBlock<Tuple<FileResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
Console.WriteLine("File worker: using resources...");
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(new Random().Next(500, 2000));
// Print a message.
Console.WriteLine("File worker: finished using resources...");
// Release the resources back to their respective pools.
fileResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
// Link the resource pools to the JoinBlock<T1, T2> objects.
// Because these join blocks operate in non-greedy mode, they do not
// take the resource from a pool until all resources are available from
// all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);
fileResources.LinkTo(joinFileAndMemoryResources.Target1);
memoryResources.LinkTo(joinFileAndMemoryResources.Target2);
// Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
joinFileAndMemoryResources.LinkTo(fileMemoryAction);
// Populate the resource pools. In this example, network and
// file resources are more abundant than memory resources.
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
networkResources.Post(new NetworkResource());
memoryResources.Post(new MemoryResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
fileResources.Post(new FileResource());
// Allow data to flow through the network for several seconds.
Thread.Sleep(10000);
}
}
/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use non-greedy join blocks to distribute
' resources among a dataflow network.
Friend Class Program
' Represents a resource. A derived class might represent
' a limited resource such as a memory, network, or I/O
' device.
Private MustInherit Class Resource
End Class
' Represents a memory resource. For brevity, the details of
' this class are omitted.
Private Class MemoryResource
Inherits Resource
End Class
' Represents a network resource. For brevity, the details of
' this class are omitted.
Private Class NetworkResource
Inherits Resource
End Class
' Represents a file resource. For brevity, the details of
' this class are omitted.
Private Class FileResource
Inherits Resource
End Class
Shared Sub Main(ByVal args() As String)
' Create three BufferBlock<T> objects. Each object holds a different
' type of resource.
Dim networkResources = New BufferBlock(Of NetworkResource)()
Dim fileResources = New BufferBlock(Of FileResource)()
Dim memoryResources = New BufferBlock(Of MemoryResource)()
' Create two non-greedy JoinBlock<T1, T2> objects.
' The first join works with network and memory resources;
' the second pool works with file and memory resources.
Dim joinNetworkAndMemoryResources = New JoinBlock(Of NetworkResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
Dim joinFileAndMemoryResources = New JoinBlock(Of FileResource, MemoryResource)(New GroupingDataflowBlockOptions With {.Greedy = False})
' Create two ActionBlock<T> objects.
' The first block acts on a network resource and a memory resource.
' The second block acts on a file resource and a memory resource.
Dim networkMemoryAction = New ActionBlock(Of Tuple(Of NetworkResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("Network worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("Network worker: finished using resources...")
networkResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
Dim fileMemoryAction = New ActionBlock(Of Tuple(Of FileResource, MemoryResource))(Sub(data)
' Perform some action on the resources.
' Print a message.
' Simulate a lengthy operation that uses the resources.
' Print a message.
' Release the resources back to their respective pools.
Console.WriteLine("File worker: using resources...")
Thread.Sleep(New Random().Next(500, 2000))
Console.WriteLine("File worker: finished using resources...")
fileResources.Post(data.Item1)
memoryResources.Post(data.Item2)
End Sub)
' Link the resource pools to the JoinBlock<T1, T2> objects.
' Because these join blocks operate in non-greedy mode, they do not
' take the resource from a pool until all resources are available from
' all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1)
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2)
fileResources.LinkTo(joinFileAndMemoryResources.Target1)
memoryResources.LinkTo(joinFileAndMemoryResources.Target2)
' Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction)
joinFileAndMemoryResources.LinkTo(fileMemoryAction)
' Populate the resource pools. In this example, network and
' file resources are more abundant than memory resources.
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
networkResources.Post(New NetworkResource())
memoryResources.Post(New MemoryResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
fileResources.Post(New FileResource())
' Allow data to flow through the network for several seconds.
Thread.Sleep(10000)
End Sub
End Class
' Sample output:
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'File worker: using resources...
'File worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'Network worker: using resources...
'Network worker: finished using resources...
'File worker: using resources...
'
Für eine effiziente Nutzung des freigegebenen Pools von MemoryResource
-Objekten wird in diesem Beispiel ein GroupingDataflowBlockOptions-Objekt angegeben, für das die Greedy-Eigenschaft auf False
festgelegt ist. Auf diese Weise werden JoinBlock<T1,T2>-Objekte erzeugt, die im nicht gierigen Modus agieren. Ein nicht gieriger Gruppierungsblock stellt alle eingehenden Nachrichten zurück, bis eine aus jeder Quelle verfügbar ist. Wenn eine der zurückgestellten Nachrichten durch einen anderen Block akzeptiert wurde, wird der Prozess durch den Gruppierungsblock neu gestartet. Der nicht gierige Modus bietet die Möglichkeit, dass Gruppierungsblöcke, die mindestens einen Quellblock gemeinsam nutzen, weiter abgearbeitet werden, während andere Blöcke auf Daten warten. Wenn in diesem Beispiel ein MemoryResource
-Objekt zum memoryResources
-Pool hinzugefügt wird, kann der erste Gruppierungsblock zum Empfangen der zweiten Datenquelle weiter ausgeführt werden. Wenn in diesem Beispiel der gierige Modus (also der Standardmodus) verwendet wird, könnte ein Gruppierungsblock das MemoryResource
-Objekt akzeptieren und warten, bis die zweite Ressource verfügbar ist. Wenn der andere Gruppierungsblock jedoch seine zweite Datenquelle zur Verfügung hat, kann er keinen Fortschritt machen, da das MemoryResource
-Objekt vom anderen Gruppierungsblock akzeptiert wurde.
Stabile Programmierung
Die Verwendung von nicht gierigen Gruppierungen kann Ihnen auch dabei helfen, einen Deadlock in Ihrer Anwendung zu vermeiden. In einer Softwareanwendung kommt es zu einem Deadlock, wenn zwei oder mehr Prozesse jeweils eine Ressource halten und gegenseitig darauf warten, dass ein anderer Prozess eine andere Ressource freigibt. Betrachten Sie eine Anwendung, die zwei JoinBlock<T1,T2>-Objekte definiert. Beide Objekte lesen jeweils Daten aus zwei gemeinsam genutzten Quellblöcken. Wenn ein Gruppierungsblock im gierigen Modus aus der ersten Quelle und der zweite Gruppierungsblock aus der zweiten Quelle liest, könnte die Anwendung blockieren, weil beide Gruppierungsblöcke gegenseitig darauf warten, dass der andere seine Ressource freigibt. Im nicht gierigen Modus liest jeder Gruppierungsblock nur dann aus seinen Quellen, wenn alle Daten verfügbar sind und somit das Risiko eines Deadlocks ausgeschlossen ist.