How to: Use Arrays of Blocking Collections in a Pipeline
The following example shows how to use arrays of System.Collections.Concurrent.BlockingCollection<T> objects with static methods such as TryAddToAny and TryTakeFromAny to implement fast and flexible data transfer between components.
Example
The following example demonstrates a basic pipeline implementation in which each object is concurrently taking data from the input collection, transforming it, and passing it to the output collection.
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
class PipeLineDemo
{
public static void Main()
{
CancellationTokenSource cts = new CancellationTokenSource();
// Start up a UI thread for cancellation.
Task.Run(() =>
{
if(Console.ReadKey(true).KeyChar == 'c')
cts.Cancel();
});
//Generate some source data.
BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5];
for(int i = 0; i < sourceArrays.Length; i++)
sourceArrays[i] = new BlockingCollection<int>(500);
Parallel.For(0, sourceArrays.Length * 500, (j) =>
{
int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
if(k >=0)
Console.WriteLine("added {0} to source data", j);
});
foreach (var arr in sourceArrays)
arr.CompleteAdding();
// First filter accepts the ints, keeps back a small percentage
// as a processing fee, and converts the results to decimals.
var filter1 = new PipelineFilter<int, decimal>
(
sourceArrays,
(n) => Convert.ToDecimal(n * 0.97),
cts.Token,
"filter1"
);
// Second filter accepts the decimals and converts them to
// System.Strings.
var filter2 = new PipelineFilter<decimal, string>
(
filter1.m_output,
(s) => String.Format("{0}", s),
cts.Token,
"filter2"
);
// Third filter uses the constructor with an Action<T>
// that renders its output to the screen,
// not a blocking collection.
var filter3 = new PipelineFilter<string, string>
(
filter2.m_output,
(s) => Console.WriteLine("The final result is {0}", s),
cts.Token,
"filter3"
);
// Start up the pipeline!
try
{
Parallel.Invoke(
() => filter1.Run(),
() => filter2.Run(),
() => filter3.Run()
);
}
catch (AggregateException ae) {
foreach(var ex in ae.InnerExceptions)
Console.WriteLine(ex.Message + ex.StackTrace);
}
finally {
cts.Dispose();
}
// You will need to press twice if you ran to the end:
// once for the cancellation thread, and once for this thread.
Console.WriteLine("Press any key.");
Console.ReadKey(true);
}
class PipelineFilter<TInput, TOutput>
{
Func<TInput, TOutput> m_processor = null;
public BlockingCollection<TInput>[] m_input;
public BlockingCollection<TOutput>[] m_output = null;
Action<TInput> m_outputProcessor = null;
CancellationToken m_token;
public string Name { get; private set; }
public PipelineFilter(
BlockingCollection<TInput>[] input,
Func<TInput, TOutput> processor,
CancellationToken token,
string name)
{
m_input = input;
m_output = new BlockingCollection<TOutput>[5];
for (int i = 0; i < m_output.Length; i++)
m_output[i] = new BlockingCollection<TOutput>(500);
m_processor = processor;
m_token = token;
Name = name;
}
// Use this constructor for the final endpoint, which does
// something like write to file or screen, instead of
// pushing to another pipeline filter.
public PipelineFilter(
BlockingCollection<TInput>[] input,
Action<TInput> renderer,
CancellationToken token,
string name)
{
m_input = input;
m_outputProcessor = renderer;
m_token = token;
Name = name;
}
public void Run()
{
Console.WriteLine("{0} is running", this.Name);
while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested)
{
TInput receivedItem;
int i = BlockingCollection<TInput>.TryTakeFromAny(
m_input, out receivedItem, 50, m_token);
if ( i >= 0)
{
if (m_output != null) // we pass data to another blocking collection
{
TOutput outputItem = m_processor(receivedItem);
BlockingCollection<TOutput>.AddToAny(m_output, outputItem);
Console.WriteLine("{0} sent {1} to next", this.Name, outputItem);
}
else // we're an endpoint
{
m_outputProcessor(receivedItem);
}
}
else
{
Console.WriteLine("Unable to retrieve data from previous filter");
}
}
if (m_output != null)
{
foreach (var bc in m_output) bc.CompleteAdding();
}
}
}
}
Imports System.Collections
Imports System.Collections.Concurrent
Imports System.Collections.Generic
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks
Namespace BlockingCollectionPipeline
Class PipeLineDemo
Public Shared Sub Main()
Dim cts As New CancellationTokenSource()
' Start up a UI thread for cancellation.
Task.Factory.StartNew(Sub()
If (Console.ReadKey().KeyChar = "c"c) Then
cts.Cancel()
End If
End Sub)
'Generate some source data.
Dim sourceArrays() As BlockingCollection(Of Integer)
ReDim sourceArrays(5)
For i As Integer = 0 To sourceArrays.Length - 1
sourceArrays(i) = New BlockingCollection(Of Integer)(500)
Next
Parallel.For(0, sourceArrays.Length * 500, Sub(j)
Dim k = BlockingCollection(Of Integer).TryAddToAny(sourceArrays, j)
If (k >= 0) Then
Console.WriteLine("added {0} to source data", j)
End If
End Sub)
For Each arr In sourceArrays
arr.CompleteAdding()
Next
' First filter accepts the ints, keeps back a small percentage
' as a processing fee, and converts the results to decimals.
Dim filter1 = New PipelineFilter(Of Integer, Decimal)(
sourceArrays,
Function(n)
Return Convert.ToDecimal(n * 0.97)
End Function,
cts.Token,
"filter1"
)
' Second filter accepts the decimals and converts them to
' System.Strings.
Dim filter2 = New PipelineFilter(Of Decimal, String)(
filter1.m_output,
Function(s) (String.Format("{0}", s)),
cts.Token,
"filter2"
)
' Third filter uses the constructor with an Action<T>
' that renders its output to the screen,
' not a blocking collection.
Dim filter3 = New PipelineFilter(Of String, String)(
filter2.m_output,
Sub(s) Console.WriteLine("The final result is {0}", s),
cts.Token,
"filter3"
)
' Start up the pipeline!
Try
Parallel.Invoke(
Sub() filter1.Run(),
Sub() filter2.Run(),
Sub() filter3.Run()
)
Catch ae As AggregateException
For Each ex In ae.InnerExceptions
Console.WriteLine(ex.Message + ex.StackTrace)
Next
Finally
cts.Dispose()
End Try
' You will need to press twice if you ran to the end:
' once for the cancellation thread, and once for this thread.
Console.WriteLine("Press any key.")
Console.ReadKey()
End Sub
End Class
class PipelineFilter(Of TInput, TOutput)
Private m_processor As Func(Of TInput, TOutput) = Nothing
Public m_input() As BlockingCollection(Of TInput) = Nothing
Public m_output() As BlockingCollection(Of TOutput) = Nothing
Private m_outputProcessor As Action(Of TInput) = Nothing
Private m_token As CancellationToken
Public Name As String
Public Sub New(ByVal input() As BlockingCollection(Of TInput),
ByVal processor As Func(Of TInput, TOutput),
ByVal token As CancellationToken,
ByVal _name As String)
m_input = input
' m_output = New BlockingCollection(Of TOutput)()
ReDim m_output(5)
For i As Integer = 0 To m_output.Length - 1
m_output(i) = New BlockingCollection(Of TOutput)(500)
m_processor = processor
m_token = token
name = _name
Next
End Sub
' Use this constructor for the final endpoint, which does
' something like write to file or screen, instead of
' pushing to another pipeline filter.
Public Sub New(ByVal input() As BlockingCollection(Of TInput),
ByVal renderer As Action(Of TInput),
ByVal token As CancellationToken,
ByVal _name As String)
m_input = input
m_outputProcessor = renderer
m_token = token
name = _name
End Sub
Public Sub Run()
Console.WriteLine("{0} is running", Me.Name)
While ((m_input.All(Function(bc) bc.IsCompleted) = False) And m_token.IsCancellationRequested = False)
Dim receivedItem As TInput
Dim i As Integer = BlockingCollection(Of TInput).TryTakeFromAny(
m_input, receivedItem, 50, m_token)
If (i >= 0) Then
If (Not m_output Is Nothing) Then ' we pass data to another blocking collection
Dim outputItem As TOutput = m_processor(receivedItem)
BlockingCollection(Of TOutput).AddToAny(m_output, outputItem)
Console.WriteLine("{0} sent{1} to next", Me.Name, outputItem)
Else ' we're an endpoint
m_outputProcessor(receivedItem)
End If
else
Console.WriteLine("Unable to retrieve data from previous filter")
End If
End While
If (Not m_output Is Nothing) Then
For Each bc In m_output
bc.CompleteAdding()
Next
End If
End Sub
End Class
End Namespace
See also
Collaborate with us on GitHub
The source for this content can be found on GitHub, where you can also create and review issues and pull requests. For more information, see our contributor guide.