Vorgehensweise: Ausführen einer Aktion, wenn ein Datenflussblock Daten empfängt
Ausführungsdatenflussblock-Typen rufen einen vom Benutzer bereitgestellten Delegaten auf, wenn sie Daten empfangen. Die Klassen System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> und System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> sind Typen von Ausführungsdatenflussblöcken. Sie können das Schlüsselwort delegate
(Sub
in Visual Basic), Action<T>, Func<T,TResult> oder einen Lambdaausdruck verwenden, wenn Sie eine Arbeitsfunktion für einen Ausführungsdatenflussblock bereitstellen. In diesem Dokument wird beschrieben, wie Func<T,TResult> und Lambdaausdrücke verwendet werden, um Aktionen in Ausführungsblöcken auszuführen.
Hinweis
Die TPL-Datenflussbibliothek (System.Threading.Tasks.Dataflow-Namespace) wird nicht mit .NET ausgeliefert. Öffnen Sie zum Installieren des System.Threading.Tasks.Dataflow-Namespace in Visual Studio Ihr Projekt, wählen Sie im Menü Projekt die Option NuGet-Pakete verwalten aus, und suchen Sie online nach dem System.Threading.Tasks.Dataflow
-Paket. Alternativ können Sie es mithilfe der .NET Core-CLI installieren und dazu dotnet add package System.Threading.Tasks.Dataflow
ausführen.
Beispiel
Im folgenden Beispiel wird der Datenfluss verwendet, um eine Datei von einem Datenträger zu lesen und die Anzahl der Bytes in dieser Datei zu berechnen, die gleich 0 sind. Zum Lesen der Datei und berechnen der Anzahl von 0 (null) Bytes wird TransformBlock<TInput,TOutput> verwendet, und ActionBlock<TInput> dient dazu, die Anzahl von 0 Bytes auf der Konsole auszugegeben. Das TransformBlock<TInput,TOutput>-Objekt legt ein Func<T,TResult>-Objekt fest, um Arbeitsvorgänge auszuführen, wenn die Blöcke Daten empfangen. Das ActionBlock<TInput>-Objekt gibt mithilfe eines Lambdaausdrucks die Anzahl der gelesenen 0 Bytes auf der Konsole aus.
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
// Computes the number of zero bytes that the provided file
// contains.
static int CountBytes(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = File.OpenRead(path))
{
int bytesRead = 0;
do
{
bytesRead = fileStream.Read(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
static void Main(string[] args)
{
// Create a temporary file on disk.
string tempFile = Path.GetTempFileName();
// Write random data to the temporary file.
using (var fileStream = File.OpenWrite(tempFile))
{
Random rand = new Random();
byte[] buffer = new byte[1024];
for (int i = 0; i < 512; i++)
{
rand.NextBytes(buffer);
fileStream.Write(buffer, 0, buffer.Length);
}
}
// Create an ActionBlock<int> object that prints to the console
// the number of bytes read.
var printResult = new ActionBlock<int>(zeroBytesRead =>
{
Console.WriteLine("{0} contains {1} zero bytes.",
Path.GetFileName(tempFile), zeroBytesRead);
});
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytes = new TransformBlock<string, int>(
new Func<string, int>(CountBytes));
// Link the TransformBlock<string, int> object to the
// ActionBlock<int> object.
countBytes.LinkTo(printResult);
// Create a continuation task that completes the ActionBlock<int>
// object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });
// Post the path to the temporary file to the
// TransformBlock<string, int> object.
countBytes.Post(tempFile);
// Requests completion of the TransformBlock<string, int> object.
countBytes.Complete();
// Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait();
// Delete the temporary file.
File.Delete(tempFile);
}
}
/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
' Computes the number of zero bytes that the provided file
' contains.
Private Shared Function CountBytes(ByVal path As String) As Integer
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = File.OpenRead(path)
Dim bytesRead As Integer = 0
Do
bytesRead = fileStream.Read(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
Shared Sub Main(ByVal args() As String)
' Create a temporary file on disk.
Dim tempFile As String = Path.GetTempFileName()
' Write random data to the temporary file.
Using fileStream = File.OpenWrite(tempFile)
Dim rand As New Random()
Dim buffer(1023) As Byte
For i As Integer = 0 To 511
rand.NextBytes(buffer)
fileStream.Write(buffer, 0, buffer.Length)
Next i
End Using
' Create an ActionBlock<int> object that prints to the console
' the number of bytes read.
Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))
' Link the TransformBlock<string, int> object to the
' ActionBlock<int> object.
countBytes.LinkTo(printResult)
' Create a continuation task that completes the ActionBlock<int>
' object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(Sub() printResult.Complete())
' Post the path to the temporary file to the
' TransformBlock<string, int> object.
countBytes.Post(tempFile)
' Requests completion of the TransformBlock<string, int> object.
countBytes.Complete()
' Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait()
' Delete the temporary file.
File.Delete(tempFile)
End Sub
End Class
' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'
Obwohl Sie für ein TransformBlock<TInput,TOutput>-Objekt einen Lambdaausdruck bereitstellen können, wird in diesem Beispiel Func<T,TResult> verwendet, damit anderer Code die CountBytes
-Methode anwenden kann. Das ActionBlock<TInput>-Objekt verwendet einen Lambdaausdruck, da die auszuführende Arbeit für diesen Task spezifisch und mit anderem Code wahrscheinlich nicht hilfreich ist. Weitere Informationen zur Funktionsweise von Lambda-Ausdrücken in der Task Parallel Library finden Sie unter Lambda Expressions in PLINQ and TPL (Lambda-Ausdrücke in PLINQ und TPL).
Der Abschnitt mit der Übersicht über die Delegattypen im Dokument Datenfluss fasst die Delegattypen zusammen, die Sie für die Objekte ActionBlock<TInput>, TransformBlock<TInput,TOutput> und TransformManyBlock<TInput,TOutput> zur Verfügung stellen können. In der Tabelle wird auch angegeben, ob der Delegattyp synchron oder asynchron arbeitet.
Stabile Programmierung
Dieses Beispiel enthält einen Delegaten vom Typ Func<T,TResult> für das TransformBlock<TInput,TOutput>-Objekt, um den Task des Datenflussblocks synchron auszuführen. Damit das asynchrone Verhalten des Datenflussblocks aktiviert wird, stellen Sie einen Delegaten vom Typ Func<T, Task<TResult>>
für den Datenflussblock bereit. Wenn sich ein Datenflussblock asynchron verhält, ist der Task des Datenflussblocks nur dann abgeschlossen, wenn das zurückgegebene Task<TResult>-Objekt beendet wird. Das folgende Beispiel ändert die CountBytes
-Methode und verwendet die Operatoren async und await (Async und Await in Visual Basic), um die Gesamtzahl der Bytes in der bereitgestellten Datei asynchron zu berechnen, die 0 (null) sind. Die ReadAsync-Methode führt asynchrone Dateilesevorgänge aus.
// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
' Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
Sie können auch asynchrone Lambda-Ausdrücke verwenden, um eine Aktion in einem Ausführungsdatenflussblock auszuführen. Das folgende Beispiel ändert das im vorherigen Beispiel verwendete TransformBlock<TInput,TOutput>-Objekt, sodass die Arbeit mithilfe eines Lambdaausdrucks asynchron ausgeführt wird.
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
' Asynchronously read from the file stream.
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function)