Procedure: Actie uitvoeren wanneer een gegevensstroomblok gegevens ontvangt
Bloktypen voor uitvoeringsgegevensstromen roepen een door de gebruiker opgegeven gemachtigde aan wanneer ze gegevens ontvangen. De System.Threading.Tasks.Dataflow.ActionBlock<TInput>typen , System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput>en System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> klassen zijn bloktypen voor uitvoeringsgegevensstromen. U kunt het delegate
trefwoord (Sub
in Visual Basic), Func<T,TResult>Action<T>of een lambda-expressie gebruiken wanneer u een werkfunctie opgeeft aan een uitvoeringsgegevensstroomblok. In dit document wordt beschreven hoe u lambda-expressies gebruikt Func<T,TResult> om actie uit te voeren in uitvoeringsblokken.
Notitie
De TPL-gegevensstroombibliotheek (de System.Threading.Tasks.Dataflow naamruimte) wordt niet gedistribueerd met .NET. Als u de System.Threading.Tasks.Dataflow naamruimte in Visual Studio wilt installeren, opent u uw project, kiest u NuGet-pakketten beheren in het menu Project en zoekt u online naar het System.Threading.Tasks.Dataflow
pakket. U kunt het ook installeren met behulp van de .NET Core CLI.dotnet add package System.Threading.Tasks.Dataflow
Opmerking
In het volgende voorbeeld wordt een gegevensstroom gebruikt om een bestand van schijf te lezen en wordt het aantal bytes in dat bestand berekend dat gelijk is aan nul. Het gebruikt TransformBlock<TInput,TOutput> om het bestand te lezen en het aantal nul bytes te berekenen en ActionBlock<TInput> om het aantal nul bytes af te drukken op de console. Het TransformBlock<TInput,TOutput> object geeft een Func<T,TResult> object op dat moet worden uitgevoerd wanneer de blokken gegevens ontvangen. Het ActionBlock<TInput> object maakt gebruik van een lambda-expressie om af te drukken naar de console het aantal nul bytes dat wordt gelezen.
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
// Computes the number of zero bytes that the provided file
// contains.
static int CountBytes(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = File.OpenRead(path))
{
int bytesRead = 0;
do
{
bytesRead = fileStream.Read(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
static void Main(string[] args)
{
// Create a temporary file on disk.
string tempFile = Path.GetTempFileName();
// Write random data to the temporary file.
using (var fileStream = File.OpenWrite(tempFile))
{
Random rand = new Random();
byte[] buffer = new byte[1024];
for (int i = 0; i < 512; i++)
{
rand.NextBytes(buffer);
fileStream.Write(buffer, 0, buffer.Length);
}
}
// Create an ActionBlock<int> object that prints to the console
// the number of bytes read.
var printResult = new ActionBlock<int>(zeroBytesRead =>
{
Console.WriteLine("{0} contains {1} zero bytes.",
Path.GetFileName(tempFile), zeroBytesRead);
});
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytes = new TransformBlock<string, int>(
new Func<string, int>(CountBytes));
// Link the TransformBlock<string, int> object to the
// ActionBlock<int> object.
countBytes.LinkTo(printResult);
// Create a continuation task that completes the ActionBlock<int>
// object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });
// Post the path to the temporary file to the
// TransformBlock<string, int> object.
countBytes.Post(tempFile);
// Requests completion of the TransformBlock<string, int> object.
countBytes.Complete();
// Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait();
// Delete the temporary file.
File.Delete(tempFile);
}
}
/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
' Computes the number of zero bytes that the provided file
' contains.
Private Shared Function CountBytes(ByVal path As String) As Integer
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = File.OpenRead(path)
Dim bytesRead As Integer = 0
Do
bytesRead = fileStream.Read(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
Shared Sub Main(ByVal args() As String)
' Create a temporary file on disk.
Dim tempFile As String = Path.GetTempFileName()
' Write random data to the temporary file.
Using fileStream = File.OpenWrite(tempFile)
Dim rand As New Random()
Dim buffer(1023) As Byte
For i As Integer = 0 To 511
rand.NextBytes(buffer)
fileStream.Write(buffer, 0, buffer.Length)
Next i
End Using
' Create an ActionBlock<int> object that prints to the console
' the number of bytes read.
Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))
' Link the TransformBlock<string, int> object to the
' ActionBlock<int> object.
countBytes.LinkTo(printResult)
' Create a continuation task that completes the ActionBlock<int>
' object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(Sub() printResult.Complete())
' Post the path to the temporary file to the
' TransformBlock<string, int> object.
countBytes.Post(tempFile)
' Requests completion of the TransformBlock<string, int> object.
countBytes.Complete()
' Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait()
' Delete the temporary file.
File.Delete(tempFile)
End Sub
End Class
' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'
Hoewel u een lambda-expressie kunt opgeven voor een TransformBlock<TInput,TOutput> object, gebruikt Func<T,TResult> dit voorbeeld om andere code in te schakelen voor het gebruik van de CountBytes
methode. Het ActionBlock<TInput> object maakt gebruik van een lambda-expressie omdat het werk dat moet worden uitgevoerd specifiek is voor deze taak en waarschijnlijk niet nuttig is vanuit andere code. Zie Lambda-expressies in PLINQ en TPL voor meer informatie over de werking van lambda-expressies in de taakparallelbibliotheek.
De sectie Samenvatting van gedelegeerdentypen in het document Gegevensstroom bevat een overzicht van de gedelegeerdentypen die u kunt opgeven ActionBlock<TInput>, TransformBlock<TInput,TOutput>en TransformManyBlock<TInput,TOutput> objecten. In de tabel wordt ook aangegeven of het type gemachtigde synchroon of asynchroon werkt.
Robuuste programmering
Dit voorbeeld bevat een gemachtigde van het type Func<T,TResult> voor het TransformBlock<TInput,TOutput> object om de taak van het gegevensstroomblok synchroon uit te voeren. Als u wilt dat het gegevensstroomblok asynchroon werkt, geeft u een gemachtigde van het type Func<T, Task<TResult>>
op voor het gegevensstroomblok. Wanneer een gegevensstroomblok asynchroon werkt, wordt de taak van het gegevensstroomblok alleen voltooid wanneer het geretourneerde Task<TResult> object is voltooid. In het volgende voorbeeld wordt de CountBytes
methode gewijzigd en worden de asynchrone operators (Asyncand Await in Visual Basic) gebruikt om asynchroon het totale aantal bytes te berekenen dat nul is in het opgegeven bestand. Met de ReadAsync methode worden bestandsleesbewerkingen asynchroon uitgevoerd.
// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
' Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
U kunt ook asynchrone lambda-expressies gebruiken om actie uit te voeren in een uitvoeringsgegevensstroomblok. In het volgende voorbeeld wordt het TransformBlock<TInput,TOutput> object gewijzigd dat in het vorige voorbeeld wordt gebruikt, zodat er een lambda-expressie wordt gebruikt om het werk asynchroon uit te voeren.
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
' Asynchronously read from the file stream.
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function)