方法: パイプラインでブロッキング コレクションの配列を使用する
次の例は、System.Collections.Concurrent.BlockingCollection<T> オブジェクトの配列を TryAddToAny や TryTakeFromAny などの静的メソッドと共に使用して、コンポーネント間の高速で柔軟性のあるデータ転送を実装する方法を示しています。
使用例
次の例は、各オブジェクトが入力コレクションからデータを同時に取得して変換し、出力コレクションに渡す基本的なパイプラインの実装を示しています。
Imports System
Imports System.Collections
Imports System.Collections.Concurrent
Imports System.Collections.Generic
Imports System.Linq
Imports System.Text
Imports System.Threading
Imports System.Threading.Tasks
Namespace BlockingCollectionPipeline
Class PipeLineDemo
Public Shared Sub Main()
Dim cts As CancellationTokenSource = New CancellationTokenSource()
' Start up a UI thread for cancellation.
Task.Factory.StartNew(Sub()
If (Console.ReadKey().KeyChar = "c"c) Then
cts.Cancel()
End If
End Sub)
'Generate some source data.
Dim sourceArrays() As BlockingCollection(Of Integer)
ReDim sourceArrays(5)
For i As Integer = 0 To sourceArrays.Length - 1
sourceArrays(i) = New BlockingCollection(Of Integer)(500)
Next
Parallel.For(0, sourceArrays.Length * 500, Sub(j)
Dim k = BlockingCollection(Of Integer).TryAddToAny(sourceArrays, j)
If (k >= 0) Then
Console.WriteLine("added {0} to source data", j)
End If
End Sub)
For Each arr In sourceArrays
arr.CompleteAdding()
Next
' First filter accepts the ints, keeps back a small percentage
' as a processing fee, and converts the results to decimals.
Dim filter1 = New PipelineFilter(Of Integer, Decimal)(
sourceArrays,
Function(n)
Return Convert.ToDecimal(n * 0.97)
End Function,
cts.Token,
"filter1"
)
' Second filter accepts the decimals and converts them to
' System.Strings.
Dim filter2 = New PipelineFilter(Of Decimal, String)(
filter1.m_output,
Function(s) (String.Format("{0}", s)),
cts.Token,
"filter2"
)
' Third filter uses the constructor with an Action<T>
' that renders its output to the screen,
' not a blocking collection.
Dim filter3 = New PipelineFilter(Of String, String)(
filter2.m_output,
Sub(s) Console.WriteLine("The final result is {0}", s),
cts.Token,
"filter3"
)
' Start up the pipeline!
Try
Parallel.Invoke(
Sub() filter1.Run(),
Sub() filter2.Run(),
Sub() filter3.Run()
)
Catch ae As AggregateException
For Each ex In ae.InnerExceptions
Console.WriteLine(ex.Message + ex.StackTrace)
Next
End Try
' You will need to press twice if you ran to the end:
' once for the cancellation thread, and once for this thread.
Console.WriteLine("Press any key.")
Console.ReadKey()
End Sub
End Class
class PipelineFilter(Of TInput, TOutput)
Private m_processor As Func(Of TInput, TOutput) = Nothing
Public m_input() As BlockingCollection(Of TInput) = Nothing
Public m_output() As BlockingCollection(Of TOutput) = Nothing
Private m_outputProcessor As Action(Of TInput) = Nothing
Private m_token As CancellationToken
Public Name As String
Public Sub New(ByVal input() As BlockingCollection(Of TInput),
ByVal processor As Func(Of TInput, TOutput),
ByVal token As CancellationToken,
ByVal name As String)
m_input = input
' m_output = New BlockingCollection(Of TOutput)()
ReDim m_output(5)
For i As Integer = 0 To m_output.Length - 1
m_output(i) = New BlockingCollection(Of TOutput)(500)
m_processor = processor
m_token = token
name = name
Next
End Sub
' Use this constructor for the final endpoint, which does
' something like write to file or screen, instead of
' pushing to another pipeline filter.
Public Sub New(ByVal input() As BlockingCollection(Of TInput),
ByVal renderer As Action(Of TInput),
ByVal token As CancellationToken,
ByVal name As String)
m_input = input
m_outputProcessor = renderer
m_token = token
name = name
End Sub
Public Sub Run()
Console.WriteLine("{0} is running", Me.Name)
While ((m_input.All(Function(bc) bc.IsCompleted) = False) And m_token.IsCancellationRequested = False)
Dim receivedItem As TInput
Dim i As Integer = BlockingCollection(Of TInput).TryTakeFromAny(
m_input, receivedItem, 50, m_token)
If (i >= 0) Then
If (Not m_output Is Nothing) Then ' we pass data to another blocking collection
Dim outputItem As TOutput = m_processor(receivedItem)
BlockingCollection(Of TOutput).AddToAny(m_output, outputItem)
Console.WriteLine("{0} sent{1} to next", Me.Name, outputItem)
Else ' we're an endpoint
m_outputProcessor(receivedItem)
End If
else
Console.WriteLine("Unable to retrieve data from previous filter")
End If
End While
If (Not m_output Is Nothing) Then
For Each bc In m_output
bc.CompleteAdding()
Next
End If
End Sub
End Class
End Namespace
namespace BlockingCollectionPipeline
{
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
class PipeLineDemo
{
public static void Main()
{
CancellationTokenSource cts = new CancellationTokenSource();
// Start up a UI thread for cancellation.
Task.Factory.StartNew(() =>
{
if(Console.ReadKey().KeyChar == 'c')
cts.Cancel();
});
//Generate some source data.
BlockingCollection<int>[] sourceArrays = new BlockingCollection<int>[5];
for(int i = 0; i < sourceArrays.Length; i++)
sourceArrays[i] = new BlockingCollection<int>(500);
Parallel.For(0, sourceArrays.Length * 500, (j) =>
{
int k = BlockingCollection<int>.TryAddToAny(sourceArrays, j);
if(k >=0)
Console.WriteLine("added {0} to source data", j);
});
foreach (var arr in sourceArrays)
arr.CompleteAdding();
// First filter accepts the ints, keeps back a small percentage
// as a processing fee, and converts the results to decimals.
var filter1 = new PipelineFilter<int, decimal>
(
sourceArrays,
(n) => Convert.ToDecimal(n * 0.97),
cts.Token,
"filter1"
);
// Second filter accepts the decimals and converts them to
// System.Strings.
var filter2 = new PipelineFilter<decimal, string>
(
filter1.m_output,
(s) => String.Format("{0}", s),
cts.Token,
"filter2"
);
// Third filter uses the constructor with an Action<T>
// that renders its output to the screen,
// not a blocking collection.
var filter3 = new PipelineFilter<string, string>
(
filter2.m_output,
(s) => Console.WriteLine("The final result is {0}", s),
cts.Token,
"filter3"
);
// Start up the pipeline!
try
{
Parallel.Invoke(
() => filter1.Run(),
() => filter2.Run(),
() => filter3.Run()
);
}
catch (AggregateException ae)
{
foreach(var ex in ae.InnerExceptions)
Console.WriteLine(ex.Message + ex.StackTrace);
}
// You will need to press twice if you ran to the end:
// once for the cancellation thread, and once for this thread.
Console.WriteLine("Press any key.");
Console.ReadKey();
}
class PipelineFilter<TInput, TOutput>
{
Func<TInput, TOutput> m_processor = null;
public BlockingCollection<TInput>[] m_input;
public BlockingCollection<TOutput>[] m_output = null;
Action<TInput> m_outputProcessor = null;
CancellationToken m_token;
public string Name { get; private set; }
public PipelineFilter(
BlockingCollection<TInput>[] input,
Func<TInput, TOutput> processor,
CancellationToken token,
string name)
{
m_input = input;
m_output = new BlockingCollection<TOutput>[5];
for (int i = 0; i < m_output.Length; i++)
m_output[i] = new BlockingCollection<TOutput>(500);
m_processor = processor;
m_token = token;
Name = name;
}
// Use this constructor for the final endpoint, which does
// something like write to file or screen, instead of
// pushing to another pipeline filter.
public PipelineFilter(
BlockingCollection<TInput>[] input,
Action<TInput> renderer,
CancellationToken token,
string name)
{
m_input = input;
m_outputProcessor = renderer;
m_token = token;
Name = name;
}
public void Run()
{
Console.WriteLine("{0} is running", this.Name);
while (!m_input.All(bc => bc.IsCompleted) && !m_token.IsCancellationRequested)
{
TInput receivedItem;
int i = BlockingCollection<TInput>.TryTakeFromAny(
m_input, out receivedItem, 50, m_token);
if ( i >= 0)
{
if (m_output != null) // we pass data to another blocking collection
{
TOutput outputItem = m_processor(receivedItem);
BlockingCollection<TOutput>.AddToAny(m_output, outputItem);
Console.WriteLine("{0} sent {1} to next", this.Name, outputItem);
}
else // we're an endpoint
{
m_outputProcessor(receivedItem);
}
}
else
Console.WriteLine("Unable to retrieve data from previous filter");
}
if (m_output != null)
{
foreach (var bc in m_output) bc.CompleteAdding();
}
}
}
}
}