Vorgehensweise: Aufheben der Verknüpfungen von Datenflussblöcken
In diesem Dokument erfahren Sie, wie Sie eine Verknüpfung zwischen einen Zieldatenflussblock und der zugehörigen Quelle auflösen.
Hinweis
Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow
-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow
ausführen.
Beispiel
Im folgenden Beispiel werden drei TransformBlock<TInput,TOutput>-Objekte erstellt, die jeweils die TrySolution
-Methode aufrufen, um einen Wert zu berechnen. In diesem Beispiel wird nur das Ergebnis vom ersten Aufruf von TrySolution
zum Beenden benötigt.
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to unlink dataflow blocks.
class DataflowReceiveAny
{
// Receives the value from the first provided source that has
// a message.
public static T ReceiveFromAny<T>(params ISourceBlock<T>[] sources)
{
// Create a WriteOnceBlock<T> object and link it to each source block.
var writeOnceBlock = new WriteOnceBlock<T>(e => e);
foreach (var source in sources)
{
// Setting MaxMessages to one instructs
// the source block to unlink from the WriteOnceBlock<T> object
// after offering the WriteOnceBlock<T> object one message.
source.LinkTo(writeOnceBlock, new DataflowLinkOptions { MaxMessages = 1 });
}
// Return the first value that is offered to the WriteOnceBlock object.
return writeOnceBlock.Receive();
}
// Demonstrates a function that takes several seconds to produce a result.
static int TrySolution(int n, CancellationToken ct)
{
// Simulate a lengthy operation that completes within three seconds
// or when the provided CancellationToken object is cancelled.
SpinWait.SpinUntil(() => ct.IsCancellationRequested,
new Random().Next(3000));
// Return a value.
return n + 42;
}
static void Main(string[] args)
{
// Create a shared CancellationTokenSource object to enable the
// TrySolution method to be cancelled.
var cts = new CancellationTokenSource();
// Create three TransformBlock<int, int> objects.
// Each TransformBlock<int, int> object calls the TrySolution method.
Func<int, int> action = n => TrySolution(n, cts.Token);
var trySolution1 = new TransformBlock<int, int>(action);
var trySolution2 = new TransformBlock<int, int>(action);
var trySolution3 = new TransformBlock<int, int>(action);
// Post data to each TransformBlock<int, int> object.
trySolution1.Post(11);
trySolution2.Post(21);
trySolution3.Post(31);
// Call the ReceiveFromAny<T> method to receive the result from the
// first TransformBlock<int, int> object to finish.
int result = ReceiveFromAny(trySolution1, trySolution2, trySolution3);
// Cancel all calls to TrySolution that are still active.
cts.Cancel();
// Print the result to the console.
Console.WriteLine("The solution is {0}.", result);
cts.Dispose();
}
}
/* Sample output:
The solution is 53.
*/
Imports System.Threading
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to unlink dataflow blocks.
Friend Class DataflowReceiveAny
' Receives the value from the first provided source that has
' a message.
Public Shared Function ReceiveFromAny(Of T)(ParamArray ByVal sources() As ISourceBlock(Of T)) As T
' Create a WriteOnceBlock<T> object and link it to each source block.
Dim writeOnceBlock = New WriteOnceBlock(Of T)(Function(e) e)
For Each source In sources
' Setting MaxMessages to one instructs
' the source block to unlink from the WriteOnceBlock<T> object
' after offering the WriteOnceBlock<T> object one message.
source.LinkTo(writeOnceBlock, New DataflowLinkOptions With {.MaxMessages = 1})
Next source
' Return the first value that is offered to the WriteOnceBlock object.
Return writeOnceBlock.Receive()
End Function
' Demonstrates a function that takes several seconds to produce a result.
Private Shared Function TrySolution(ByVal n As Integer, ByVal ct As CancellationToken) As Integer
' Simulate a lengthy operation that completes within three seconds
' or when the provided CancellationToken object is cancelled.
SpinWait.SpinUntil(Function() ct.IsCancellationRequested, New Random().Next(3000))
' Return a value.
Return n + 42
End Function
Shared Sub Main(ByVal args() As String)
' Create a shared CancellationTokenSource object to enable the
' TrySolution method to be cancelled.
Dim cts = New CancellationTokenSource()
' Create three TransformBlock<int, int> objects.
' Each TransformBlock<int, int> object calls the TrySolution method.
Dim action As Func(Of Integer, Integer) = Function(n) TrySolution(n, cts.Token)
Dim trySolution1 = New TransformBlock(Of Integer, Integer)(action)
Dim trySolution2 = New TransformBlock(Of Integer, Integer)(action)
Dim trySolution3 = New TransformBlock(Of Integer, Integer)(action)
' Post data to each TransformBlock<int, int> object.
trySolution1.Post(11)
trySolution2.Post(21)
trySolution3.Post(31)
' Call the ReceiveFromAny<T> method to receive the result from the
' first TransformBlock<int, int> object to finish.
Dim result As Integer = ReceiveFromAny(trySolution1, trySolution2, trySolution3)
' Cancel all calls to TrySolution that are still active.
cts.Cancel()
' Print the result to the console.
Console.WriteLine("The solution is {0}.", result)
cts.Dispose()
End Sub
End Class
' Sample output:
'The solution is 53.
'
Um den Wert aus dem ersten TransformBlock<TInput,TOutput>-Objekt zu erhalten, das beendet wird, wird in diesem Beispiel die ReceiveFromAny(T)
-Methode definiert. Die ReceiveFromAny(T)
-Methode akzeptiert ein Array aus ISourceBlock<TOutput>-Objekten und verknüpft jedes dieser Objekte mit einem WriteOnceBlock<T>-Objekt. Wenn Sie die LinkTo-Methode aufrufen, um einen Quelldatenflussblock mit einem Zieldatenflussblock zu verknüpfen, überträgt der Quelldatenflussblock Daten an den Zielblock, sobald welche verfügbar sind. Da die WriteOnceBlock<T>-Klasse nur die erste angebotene Nachricht akzeptiert, erzeugt die ReceiveFromAny(T)
-Methode ihr Ergebnis durch Aufrufen der Receive-Methode. Hierdurch wird die erste Nachricht erzeugt, die dem WriteOnceBlock<T>-Objekt angeboten wird. Die LinkTo-Methode weist eine überladene Version auf, die ein DataflowLinkOptions-Objekt mit der Eigenschaft MaxMessages akzeptiert, wenn diese auf 1
festgelegt ist. Durch dieses Objekt wird der Quellblock angewiesen, die Verknüpfung mit dem Ziel zu lösen, nachdem das Ziel eine Nachricht von der Quelle erhalten hat. Es ist wichtig, dass das WriteOnceBlock<T>-Objekt von seinen Quellen getrennt wird, da die Beziehung zwischen dem Array der Quellen und dem WriteOnceBlock<T>-Objekt nicht mehr benötigt wird, nachdem das WriteOnceBlock<T>-Objekt eine Nachricht erhalten hat.
Damit die verbleibenden Aufrufe von TrySolution
beendet werden, sobald einer einen Wert zurückgegeben hat, übernimmt die TrySolution
-Methode ein CancellationToken-Objekt, das abgebrochen wird, nachdem der Aufruf von ReceiveFromAny(T)
zurückgegeben wurde. Die SpinUntil-Methode gibt zurück, wann dieses CancellationToken-Objekt abgebrochen wird.