Como: executar ações quando um bloco de fluxo de dados recebe dados
Os tipos de Blocos de fluxo de dados de execução chamam um representante fornecido pelo usuário ao receber dados. As classes System.Threading.Tasks.Dataflow.ActionBlock<TInput>, System.Threading.Tasks.Dataflow.TransformBlock<TInput,TOutput> e System.Threading.Tasks.Dataflow.TransformManyBlock<TInput,TOutput> são tipos de blocos de fluxo de dados de execução. Você pode usar a palavra-chave delegate
(Sub
em Visual Basic), Action<T>, Func<T,TResult> ou uma expressão lambda ao fornecer uma função de trabalho a um bloco de fluxo de dados de execução. Este documento descreve como usar Func<T,TResult> e as expressões lambda para executar a ação em blocos de execução.
Observação
A Biblioteca de Fluxo de Dados TPL (o namespace System.Threading.Tasks.Dataflow) não é distribuída com o .NET. Para instalar o namespace System.Threading.Tasks.Dataflow no Visual Studio, abra o projeto, escolha Gerenciar Pacotes NuGet no menu Projeto e pesquise online o pacote System.Threading.Tasks.Dataflow
. Como alternativa, instale-o usando a CLI do .NET Core e execute dotnet add package System.Threading.Tasks.Dataflow
.
Exemplo
O exemplo a seguir usa o fluxo de dados para ler um arquivo do disco e calcula o número de bytes no arquivo que são iguais a zero. Ele usa TransformBlock<TInput,TOutput> para ler o arquivo e calcula o número de zero bytes e ActionBlock<TInput> para imprimir o número de zero bytes no console. O objeto TransformBlock<TInput,TOutput> especifica um objeto Func<T,TResult> para executar o trabalho quando os blocos recebem dados. O objeto ActionBlock<TInput> usa uma expressão lambda para imprimir no console o número de zero bytes que são lidos.
using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to provide delegates to exectution dataflow blocks.
class DataflowExecutionBlocks
{
// Computes the number of zero bytes that the provided file
// contains.
static int CountBytes(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = File.OpenRead(path))
{
int bytesRead = 0;
do
{
bytesRead = fileStream.Read(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
static void Main(string[] args)
{
// Create a temporary file on disk.
string tempFile = Path.GetTempFileName();
// Write random data to the temporary file.
using (var fileStream = File.OpenWrite(tempFile))
{
Random rand = new Random();
byte[] buffer = new byte[1024];
for (int i = 0; i < 512; i++)
{
rand.NextBytes(buffer);
fileStream.Write(buffer, 0, buffer.Length);
}
}
// Create an ActionBlock<int> object that prints to the console
// the number of bytes read.
var printResult = new ActionBlock<int>(zeroBytesRead =>
{
Console.WriteLine("{0} contains {1} zero bytes.",
Path.GetFileName(tempFile), zeroBytesRead);
});
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytes = new TransformBlock<string, int>(
new Func<string, int>(CountBytes));
// Link the TransformBlock<string, int> object to the
// ActionBlock<int> object.
countBytes.LinkTo(printResult);
// Create a continuation task that completes the ActionBlock<int>
// object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(delegate { printResult.Complete(); });
// Post the path to the temporary file to the
// TransformBlock<string, int> object.
countBytes.Post(tempFile);
// Requests completion of the TransformBlock<string, int> object.
countBytes.Complete();
// Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait();
// Delete the temporary file.
File.Delete(tempFile);
}
}
/* Sample output:
tmp4FBE.tmp contains 2081 zero bytes.
*/
Imports System.IO
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to provide delegates to exectution dataflow blocks.
Friend Class DataflowExecutionBlocks
' Computes the number of zero bytes that the provided file
' contains.
Private Shared Function CountBytes(ByVal path As String) As Integer
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = File.OpenRead(path)
Dim bytesRead As Integer = 0
Do
bytesRead = fileStream.Read(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
Shared Sub Main(ByVal args() As String)
' Create a temporary file on disk.
Dim tempFile As String = Path.GetTempFileName()
' Write random data to the temporary file.
Using fileStream = File.OpenWrite(tempFile)
Dim rand As New Random()
Dim buffer(1023) As Byte
For i As Integer = 0 To 511
rand.NextBytes(buffer)
fileStream.Write(buffer, 0, buffer.Length)
Next i
End Using
' Create an ActionBlock<int> object that prints to the console
' the number of bytes read.
Dim printResult = New ActionBlock(Of Integer)(Sub(zeroBytesRead) Console.WriteLine("{0} contains {1} zero bytes.", Path.GetFileName(tempFile), zeroBytesRead))
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytes = New TransformBlock(Of String, Integer)(New Func(Of String, Integer)(AddressOf DataflowExecutionBlocks.CountBytes))
' Link the TransformBlock<string, int> object to the
' ActionBlock<int> object.
countBytes.LinkTo(printResult)
' Create a continuation task that completes the ActionBlock<int>
' object when the TransformBlock<string, int> finishes.
countBytes.Completion.ContinueWith(Sub() printResult.Complete())
' Post the path to the temporary file to the
' TransformBlock<string, int> object.
countBytes.Post(tempFile)
' Requests completion of the TransformBlock<string, int> object.
countBytes.Complete()
' Wait for the ActionBlock<int> object to print the message.
printResult.Completion.Wait()
' Delete the temporary file.
File.Delete(tempFile)
End Sub
End Class
' Sample output:
'tmp4FBE.tmp contains 2081 zero bytes.
'
Embora você possa fornecer uma expressão lambda para um objeto TransformBlock<TInput,TOutput>, este exemplo usa Func<T,TResult> para habilitar outro código para usar o método CountBytes
. O objeto ActionBlock<TInput> usa uma expressão lambda porque o trabalho a ser executado é específico dessa tarefa e provavelmente não será útil para outro código. Para obter mais informações sobre como as expressões lambda funcionam na Biblioteca de paralelismo de tarefas, confira Expressões lambda no PLINQ e TPL.
A seção Resumo de tipos de representantes no documento Fluxo de dados resume os tipos de representantes que você pode oferecer aos objetos ActionBlock<TInput>, TransformBlock<TInput,TOutput> e TransformManyBlock<TInput,TOutput>. A tabela também especifica se o tipo de representante opera de forma síncrona ou assíncrona.
Programação robusta
Este exemplo fornece um representante do tipo Func<T,TResult> ao objeto TransformBlock<TInput,TOutput> para executar a tarefa do bloco de fluxo de dados de forma síncrona. Para habilitar o bloco de fluxo de dados para se comportar de forma assíncrona, forneça um representante do tipo Func<T, Task<TResult>>
ao bloco de fluxo de dados. Quando um bloco de fluxo de dados se comporta de forma assíncrona, a tarefa do bloco de fluxo de dados só será concluída quando o objeto Task<TResult>retornado for concluído. O exemplo a seguir modifica o método CountBytes
e usa os operadores async e await (Async e Await em Visual Basic) para calcular de forma assíncrona o número total de bytes que tem o valor zero no arquivo fornecido. O método ReadAsync executa operações de leitura de arquivo de forma assíncrona.
// Asynchronously computes the number of zero bytes that the provided file
// contains.
static async Task<int> CountBytesAsync(string path)
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
}
' Asynchronously computes the number of zero bytes that the provided file
' contains.
Private Shared async Function CountBytesAsync(ByVal path As String) As Task(Of Integer)
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
' Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function
Você também pode usar expressões lambda assíncronas para executar a ação em um bloco de fluxo de dados de execução. O exemplo a seguir modifica o objeto TransformBlock<TInput,TOutput> usado no exemplo anterior para que ele use uma expressão lambda para executar o trabalho de forma assíncrona.
// Create a TransformBlock<string, int> object that calls the
// CountBytes function and returns its result.
var countBytesAsync = new TransformBlock<string, int>(async path =>
{
byte[] buffer = new byte[1024];
int totalZeroBytesRead = 0;
using (var fileStream = new FileStream(
path, FileMode.Open, FileAccess.Read, FileShare.Read, 0x1000, true))
{
int bytesRead = 0;
do
{
// Asynchronously read from the file stream.
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length);
totalZeroBytesRead += buffer.Count(b => b == 0);
} while (bytesRead > 0);
}
return totalZeroBytesRead;
});
' Create a TransformBlock<string, int> object that calls the
' CountBytes function and returns its result.
Dim countBytesAsync = New TransformBlock(Of String, Integer)(async Function(path)
' Asynchronously read from the file stream.
Dim buffer(1023) As Byte
Dim totalZeroBytesRead As Integer = 0
Using fileStream = New FileStream(path, FileMode.Open, FileAccess.Read, FileShare.Read, &H1000, True)
Dim bytesRead As Integer = 0
Do
bytesRead = await fileStream.ReadAsync(buffer, 0, buffer.Length)
totalZeroBytesRead += buffer.Count(Function(b) b = 0)
Loop While bytesRead > 0
End Using
Return totalZeroBytesRead
End Function)