Postupy: Použití polí blokujících kolekcí v datovém kanálu
Následující příklad ukazuje, jak používat pole System.Collections.Concurrent.BlockingCollection<T> objektů se statickými metodami, jako TryAddToAny jsou a TryTakeFromAny implementovat rychlý a flexibilní přenos dat mezi komponentami.
Příklad
Následující příklad ukazuje základní implementaci kanálu, ve které každý objekt současně přebírá data ze vstupní kolekce, transformuje je a předává do výstupní kolekce.
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class PipeLineDemo
{
public static void Main()
{
CancellationTokenSource cts = new CancellationTokenSource();
// Start up a UI thread for cancellation.
Task.Run(() =>
{
if(Console.ReadKey(true).KeyChar == 'c')
cts.Cancel();
});
//Generate some source data.
BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5];
for(int i = 0; i < sourceArrays.Length; i++)
sourceArrays[i] = new BlockingCollection<int>(500);
Parallel.For(0, sourceArrays.Length * 500, (j) =>
{
int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
if(k >=0)
Console.WriteLine("added {0} to source data", j);
});
foreach (var arr in sourceArrays)
arr.CompleteAdding();
// First filter accepts the ints, keeps back a small percentage
// as a processing fee, and converts the results to decimals.
var filter1 = new PipelineFilter<int, decimal>
(
sourceArrays,
(n) => Convert.ToDecimal(n * 0.97),
cts.Token,
"filter1"
);
// Second filter accepts the decimals and converts them to
// System.Strings.
var filter2 = new PipelineFilter<decimal, string>
(
filter1.m_output,
(s) => String.Format("{0}", s),
cts.Token,
"filter2"
);
// Third filter uses the constructor with an Action<T>
// that renders its output to the screen,
// not a blocking collection.
var filter3 = new PipelineFilter<string, string>
(
filter2.m_output,
(s) => Console.WriteLine("The final result is {0}", s),
cts.Token,
"filter3"
);
// Start up the pipeline!
try
{
Parallel.Invoke(
() => filter1.Run(),
() => filter2.Run(),
() => filter3.Run()
);
}
catch (AggregateException ae) {
foreach(var ex in ae.InnerExceptions)
Console.WriteLine(ex.Message + ex.StackTrace);
}
finally {
cts.Dispose();
}
// You will need to press twice if you ran to the end:
// once for the cancellation thread, and once for this thread.
Console.WriteLine("Press any key.");
Console.ReadKey(true);
}
class PipelineFilter<TInput, TOutput>
{
Func<TInput, TOutput> m_processor = null;
public BlockingCollection<TInput>[] m_input;
public BlockingCollection<TOutput>[] m_output = null;
Action<TInput> m_outputProcessor = null;
CancellationToken m_token;
public string Name { get; private set; }
public PipelineFilter(
BlockingCollection<TInput>[] input,
Func<TInput, TOutput> processor,
CancellationToken token,
string name)
{
m_input = input;
m_output = new BlockingCollection<TOutput>[5];
for (int i = 0; i < m_output.Length; i++)
m_output[i] = new BlockingCollection<TOutput>(500);
m_processor = processor;
m_token = token;
Name = name;
}
// Use this constructor for the final endpoint, which does
// something like write to file or screen, instead of
// pushing to another pipeline filter.
public PipelineFilter(
BlockingCollection<TInput>[] input,
Action<TInput> renderer,
CancellationToken token,
string name)
{
m_input = input;
m_outputProcessor = renderer;
m_token = token;
Name = name;
}
public void Run()
{
Console.WriteLine("{0} is running", this.Name);
while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested)
{
TInput receivedItem;
int i = BlockingCollection<TInput>.TryTakeFromAny(
m_input, out receivedItem, 50, m_token);
if ( i >= 0)
{
if (m_output != null) // we pass data to another blocking collection
{
TOutput outputItem = m_processor(receivedItem);
BlockingCollection<TOutput>.AddToAny(m_output, outputItem);
Console.WriteLine("{0} sent {1} to next", this.Name, outputItem);
}
else // we're an endpoint
{
m_outputProcessor(receivedItem);
}
}
else
{
Console.WriteLine("Unable to retrieve data from previous filter");
}
}
if (m_output != null)
{
foreach (var bc in m_output) bc.CompleteAdding();
}
}
}
}
Imports System.Collections
Imports System.Collections.Concurrent
Imports System.Collections.Generic
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks
Namespace BlockingCollectionPipeline
Class PipeLineDemo
Public Shared Sub Main()
Dim cts As New CancellationTokenSource()
' Start up a UI thread for cancellation.
Task.Factory.StartNew(Sub()
If (Console.ReadKey().KeyChar = "c"c) Then
cts.Cancel()
End If
End Sub)
'Generate some source data.
Dim sourceArrays() As BlockingCollection(Of Integer)
ReDim sourceArrays(5)
For i As Integer = 0 To sourceArrays.Length - 1
sourceArrays(i) = New BlockingCollection(Of Integer)(500)
Next
Parallel.For(0, sourceArrays.Length * 500, Sub(j)
Dim k = BlockingCollection(Of Integer).TryAddToAny(sourceArrays, j)
If (k >= 0) Then
Console.WriteLine("added {0} to source data", j)
End If
End Sub)
For Each arr In sourceArrays
arr.CompleteAdding()
Next
' First filter accepts the ints, keeps back a small percentage
' as a processing fee, and converts the results to decimals.
Dim filter1 = New PipelineFilter(Of Integer, Decimal)(
sourceArrays,
Function(n)
Return Convert.ToDecimal(n * 0.97)
End Function,
cts.Token,
"filter1"
)
' Second filter accepts the decimals and converts them to
' System.Strings.
Dim filter2 = New PipelineFilter(Of Decimal, String)(
filter1.m_output,
Function(s) (String.Format("{0}", s)),
cts.Token,
"filter2"
)
' Third filter uses the constructor with an Action<T>
' that renders its output to the screen,
' not a blocking collection.
Dim filter3 = New PipelineFilter(Of String, String)(
filter2.m_output,
Sub(s) Console.WriteLine("The final result is {0}", s),
cts.Token,
"filter3"
)
' Start up the pipeline!
Try
Parallel.Invoke(
Sub() filter1.Run(),
Sub() filter2.Run(),
Sub() filter3.Run()
)
Catch ae As AggregateException
For Each ex In ae.InnerExceptions
Console.WriteLine(ex.Message + ex.StackTrace)
Next
Finally
cts.Dispose()
End Try
' You will need to press twice if you ran to the end:
' once for the cancellation thread, and once for this thread.
Console.WriteLine("Press any key.")
Console.ReadKey()
End Sub
End Class
class PipelineFilter(Of TInput, TOutput)
Private m_processor As Func(Of TInput, TOutput) = Nothing
Public m_input() As BlockingCollection(Of TInput) = Nothing
Public m_output() As BlockingCollection(Of TOutput) = Nothing
Private m_outputProcessor As Action(Of TInput) = Nothing
Private m_token As CancellationToken
Public Name As String
Public Sub New(ByVal input() As BlockingCollection(Of TInput),
ByVal processor As Func(Of TInput, TOutput),
ByVal token As CancellationToken,
ByVal _name As String)
m_input = input
' m_output = New BlockingCollection(Of TOutput)()
ReDim m_output(5)
For i As Integer = 0 To m_output.Length - 1
m_output(i) = New BlockingCollection(Of TOutput)(500)
m_processor = processor
m_token = token
name = _name
Next
End Sub
' Use this constructor for the final endpoint, which does
' something like write to file or screen, instead of
' pushing to another pipeline filter.
Public Sub New(ByVal input() As BlockingCollection(Of TInput),
ByVal renderer As Action(Of TInput),
ByVal token As CancellationToken,
ByVal _name As String)
m_input = input
m_outputProcessor = renderer
m_token = token
name = _name
End Sub
Public Sub Run()
Console.WriteLine("{0} is running", Me.Name)
While ((m_input.All(Function(bc) bc.IsCompleted) = False) And m_token.IsCancellationRequested = False)
Dim receivedItem As TInput
Dim i As Integer = BlockingCollection(Of TInput).TryTakeFromAny(
m_input, receivedItem, 50, m_token)
If (i >= 0) Then
If (Not m_output Is Nothing) Then ' we pass data to another blocking collection
Dim outputItem As TOutput = m_processor(receivedItem)
BlockingCollection(Of TOutput).AddToAny(m_output, outputItem)
Console.WriteLine("{0} sent{1} to next", Me.Name, outputItem)
Else ' we're an endpoint
m_outputProcessor(receivedItem)
End If
else
Console.WriteLine("Unable to retrieve data from previous filter")
End If
End While
If (Not m_output Is Nothing) Then
For Each bc In m_output
bc.CompleteAdding()
Next
End If
End Sub
End Class
End Namespace
Viz také
Spolupracujte s námi na GitHubu
Zdroj tohoto obsahu najdete na GitHubu, kde můžete také vytvářet a kontrolovat problémy a žádosti o přijetí změn. Další informace najdete v našem průvodci pro přispěvatele.