Postupy: Zápis a čtení zpráv z bloku toku dat
Tento článek popisuje, jak používat knihovnu toku dat TPL (Task Parallel Library) k zápisu zpráv do bloku toku dat a čtení zpráv z bloku toku dat. Knihovna toku dat TPL poskytuje synchronní i asynchronní metody pro zápis zpráv do bloku toku dat a čtení zpráv. Tento článek ukazuje, jak používat System.Threading.Tasks.Dataflow.BufferBlock<T> třídu. Třída BufferBlock<T> ukládat zprávy do vyrovnávací paměti a chová se jako zdroj zpráv i cíl zprávy.
Poznámka:
Knihovna toku dat TPL ( System.Threading.Tasks.Dataflow obor názvů) není distribuována s .NET. Pokud chcete nainstalovat System.Threading.Tasks.Dataflow obor názvů v sadě Visual Studio, otevřete projekt, v nabídce Projekt zvolte Spravovat balíčky NuGet a vyhledejte System.Threading.Tasks.Dataflow
balíček online. Pokud ho chcete nainstalovat pomocí rozhraní příkazového řádku .NET Core, spusťte dotnet add package System.Threading.Tasks.Dataflow
.
Synchronní zápis a čtení
Následující příklad používá metodu Post k zápisu BufferBlock<T> do bloku toku dat a metodu Receive pro čtení ze stejného objektu.
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
// Output:
// 0
// 1
// 2
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
Metodu TryReceive můžete také použít ke čtení z bloku toku dat, jak je znázorněno v následujícím příkladu. Metoda TryReceive neblokuje aktuální vlákno a je užitečná, když se občas dotazujete na data.
// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
Console.WriteLine(value);
}
// Output:
// 0
// 1
// 2
' Post more messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
Console.WriteLine(value)
Loop
' Output:
' 0
' 1
' 2
Vzhledem k tomu, že Post metoda funguje synchronně, BufferBlock<T> objekt v předchozích příkladech přijímá všechna data před druhou smyčkou čte data. Následující příklad rozšiřuje první příklad pomocí čtení Task.WhenAll(Task[]) z bloku zpráv a zápisu do bloku zpráv současně. Vzhledem k tomu WhenAll , že očekává všechny asynchronní operace, které se provádějí souběžně, nejsou hodnoty zapsány do objektu BufferBlock<T> v žádném konkrétním pořadí.
// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
bufferBlock.Post(0);
bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
});
var post2 = Task.Run(() =>
{
bufferBlock.Post(2);
});
await Task.WhenAll(post01, receive, post2);
// Output:
// 0
// 1
// 2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
bufferBlock.Post(0)
bufferBlock.Post(1)
End Sub)
Dim receive = Task.Run(Sub()
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)
' Output:
' 0
' 1
' 2
Zápis a čtení asynchronně
Následující příklad používá metodu SendAsync k asynchronnímu zápisu do objektu BufferBlock<T> a ReceiveAsync metoda k asynchronnímu čtení ze stejného objektu. Tento příklad používá operátory async a await (Async a Await v jazyce Visual Basic) k asynchronnímu odesílání dat do cílového bloku a čtení dat z cílového bloku. Tato SendAsync metoda je užitečná, když je nutné povolit blok toku dat k odložení zpráv. Tato ReceiveAsync metoda je užitečná, když chcete pracovat s daty, když jsou tato data k dispozici. Další informace o tom, jak se zprávy šíří mezi bloky zpráv, najdete v části Předávání zpráv v toku dat.
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
await bufferBlock.SendAsync(i);
}
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await bufferBlock.ReceiveAsync());
}
// Output:
// 0
// 1
// 2
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
await bufferBlock.SendAsync(i)
Next i
' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i
' Output:
' 0
' 1
' 2
Úplný příklad
Následující příklad ukazuje veškerý kód pro tento článek.
using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates a how to write to and read from a dataflow block.
class DataflowReadWrite
{
// Demonstrates asynchronous dataflow operations.
static async Task AsyncSendReceive(BufferBlock<int> bufferBlock)
{
// Post more messages to the block asynchronously.
for (int i = 0; i < 3; i++)
{
await bufferBlock.SendAsync(i);
}
// Asynchronously receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(await bufferBlock.ReceiveAsync());
}
// Output:
// 0
// 1
// 2
}
static async Task Main()
{
var bufferBlock = new BufferBlock<int>();
// Post several messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
// Output:
// 0
// 1
// 2
// Post more messages to the block.
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
// Receive the messages back from the block.
while (bufferBlock.TryReceive(out int value))
{
Console.WriteLine(value);
}
// Output:
// 0
// 1
// 2
// Write to and read from the message block concurrently.
var post01 = Task.Run(() =>
{
bufferBlock.Post(0);
bufferBlock.Post(1);
});
var receive = Task.Run(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine(bufferBlock.Receive());
}
});
var post2 = Task.Run(() =>
{
bufferBlock.Post(2);
});
await Task.WhenAll(post01, receive, post2);
// Output:
// 0
// 1
// 2
// Demonstrate asynchronous dataflow operations.
await AsyncSendReceive(bufferBlock);
}
}
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
' Demonstrates a how to write to and read from a dataflow block.
Friend Class DataflowReadWrite
' Demonstrates asynchronous dataflow operations.
Private Shared async Function AsyncSendReceive(ByVal bufferBlock As BufferBlock(Of Integer)) As Task
' Post more messages to the block asynchronously.
For i As Integer = 0 To 2
await bufferBlock.SendAsync(i)
Next i
' Asynchronously receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(await bufferBlock.ReceiveAsync())
Next i
' Output:
' 0
' 1
' 2
End Function
Shared Sub Main(ByVal args() As String)
Dim bufferBlock = New BufferBlock(Of Integer)()
' Post several messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
' Output:
' 0
' 1
' 2
' Post more messages to the block.
For i As Integer = 0 To 2
bufferBlock.Post(i)
Next i
' Receive the messages back from the block.
Dim value As Integer
Do While bufferBlock.TryReceive(value)
Console.WriteLine(value)
Loop
' Output:
' 0
' 1
' 2
' Write to and read from the message block concurrently.
Dim post01 = Task.Run(Sub()
bufferBlock.Post(0)
bufferBlock.Post(1)
End Sub)
Dim receive = Task.Run(Sub()
For i As Integer = 0 To 2
Console.WriteLine(bufferBlock.Receive())
Next i
End Sub)
Dim post2 = Task.Run(Sub() bufferBlock.Post(2))
Task.WaitAll(post01, receive, post2)
' Output:
' 0
' 1
' 2
' Demonstrate asynchronous dataflow operations.
AsyncSendReceive(bufferBlock).Wait()
End Sub
End Class
Další kroky
Tento příklad ukazuje, jak číst z bloku zpráv a zapisovat do bloku zpráv přímo. Bloky toku dat můžete také připojit k vytvoření kanálů, což jsou lineární sekvence bloků toku dat nebo sítě, což jsou grafy bloků toku dat. V kanálu nebo síti zdroje asynchronně šíří data do cílů, jakmile jsou tato data dostupná. Příklad, který vytvoří základní kanál toku dat, najdete v části Návod: Vytvoření kanálu toku dat. Příklad, který vytvoří složitější síť toku dat, najdete v tématu Návod: Použití toku dat v aplikaci model Windows Forms.