Przewodnik: Tworzenie niestandardowego typu bloku przepływu danych
Mimo że biblioteka przepływów danych TPL udostępnia kilka typów bloków przepływu danych, które umożliwiają korzystanie z różnych funkcji, można również tworzyć niestandardowe typy bloków. W tym dokumencie opisano sposób tworzenia typu bloku przepływu danych, który implementuje zachowanie niestandardowe.
Wymagania wstępne
Przeczytaj przepływ danych przed przeczytaniem tego dokumentu.
Uwaga
Biblioteka przepływu danych TPL ( System.Threading.Tasks.Dataflow przestrzeń nazw) nie jest dystrybuowana za pomocą platformy .NET. Aby zainstalować System.Threading.Tasks.Dataflow przestrzeń nazw w programie Visual Studio, otwórz projekt, wybierz pozycję Zarządzaj pakietami NuGet z menu Project i wyszukaj pakiet w trybie online System.Threading.Tasks.Dataflow
. Alternatywnie, aby zainstalować go przy użyciu interfejsu wiersza polecenia platformy .NET Core, uruchom polecenie dotnet add package System.Threading.Tasks.Dataflow
.
Definiowanie bloku przepływu danych okna przesuwanego
Rozważ aplikację przepływu danych, która wymaga buforowanych wartości wejściowych, a następnie danych wyjściowych w sposób przesuwany okna. Na przykład dla wartości wejściowych {0, 1, 2, 3, 4, 5} i rozmiar okna trzech, blok przepływu danych okna przesuwanego generuje tablice wyjściowe {0, 1, 2}, {1, 2, 3}, {2, 3, 4} i {3, 4, 5}. W poniższych sekcjach opisano dwa sposoby tworzenia typu bloku przepływu danych, który implementuje to zachowanie niestandardowe. Pierwsza technika używa Encapsulate metody do łączenia funkcjonalności ISourceBlock<TOutput> obiektu i ITargetBlock<TInput> obiektu w jeden blok propagacji. Druga technika definiuje klasę, która pochodzi z IPropagatorBlock<TInput,TOutput> klasy i łączy istniejące funkcje w celu wykonywania niestandardowego zachowania.
Używanie metody hermetyzacji do definiowania bloku przepływu danych okna przesuwanego
W poniższym przykładzie użyto Encapsulate metody do utworzenia bloku propagacji z obiektu docelowego i źródła. Blok propagacji umożliwia blokowi źródłowemu i blokowi docelowemu działanie jako odbiornik i nadawca danych.
Ta technika jest przydatna, gdy potrzebujesz niestandardowych funkcji przepływu danych, ale nie potrzebujesz typu, który udostępnia dodatkowe metody, właściwości lub pola.
// Creates a IPropagatorBlock<T, T[]> object propagates data in a
// sliding window fashion.
public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
{
// Create a queue to hold messages.
var queue = new Queue<T>();
// The source part of the propagator holds arrays of size windowSize
// and propagates data out to any connected targets.
var source = new BufferBlock<T[]>();
// The target part receives data and adds them to the queue.
var target = new ActionBlock<T>(item =>
{
// Add the item to the queue.
queue.Enqueue(item);
// Remove the oldest item when the queue size exceeds the window size.
if (queue.Count > windowSize)
queue.Dequeue();
// Post the data in the queue to the source block when the queue size
// equals the window size.
if (queue.Count == windowSize)
source.Post(queue.ToArray());
});
// When the target is set to the completed state, propagate out any
// remaining data and set the source to the completed state.
target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
// Return a IPropagatorBlock<T, T[]> object that encapsulates the
// target and source blocks.
return DataflowBlock.Encapsulate(target, source);
}
' Creates a IPropagatorBlock<T, T[]> object propagates data in a
' sliding window fashion.
Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
' Create a queue to hold messages.
Dim queue = New Queue(Of T)()
' The source part of the propagator holds arrays of size windowSize
' and propagates data out to any connected targets.
Dim source = New BufferBlock(Of T())()
' The target part receives data and adds them to the queue.
Dim target = New ActionBlock(Of T)(Sub(item)
' Add the item to the queue.
' Remove the oldest item when the queue size exceeds the window size.
' Post the data in the queue to the source block when the queue size
' equals the window size.
queue.Enqueue(item)
If queue.Count > windowSize Then
queue.Dequeue()
End If
If queue.Count = windowSize Then
source.Post(queue.ToArray())
End If
End Sub)
' When the target is set to the completed state, propagate out any
' remaining data and set the source to the completed state.
target.Completion.ContinueWith(Sub()
If queue.Count > 0 AndAlso queue.Count < windowSize Then
source.Post(queue.ToArray())
End If
source.Complete()
End Sub)
' Return a IPropagatorBlock<T, T[]> object that encapsulates the
' target and source blocks.
Return DataflowBlock.Encapsulate(target, source)
End Function
Wyprowadzanie z elementu IPropagatorBlock w celu zdefiniowania bloku przepływu danych okna przesuwanego
W poniższym przykładzie przedstawiono klasę SlidingWindowBlock
. Ta klasa pochodzi z IPropagatorBlock<TInput,TOutput> , aby mogła działać zarówno jako źródło, jak i element docelowy danych. Podobnie jak w poprzednim przykładzie, SlidingWindowBlock
klasa jest oparta na istniejących typach bloków przepływu danych. SlidingWindowBlock
Jednak klasa implementuje również metody wymagane przez ISourceBlock<TOutput>interfejsy , ITargetBlock<TInput>i IDataflowBlock . Te metody umożliwiają przekazywanie wszystkich operacji do wstępnie zdefiniowanych składowych typu bloku przepływu danych. Na przykład Post
metoda defers działa do m_target
elementu członkowskiego danych, który jest również obiektem ITargetBlock<TInput> .
Ta technika jest przydatna, gdy wymagane są niestandardowe funkcje przepływu danych, a także wymagają typu, który udostępnia dodatkowe metody, właściwości lub pola. Na przykład SlidingWindowBlock
klasa pochodzi również z IReceivableSourceBlock<TOutput> klasy , aby mogła podać TryReceive metody i TryReceiveAll . Klasa SlidingWindowBlock
pokazuje również rozszerzalność, udostępniając WindowSize
właściwość, która pobiera liczbę elementów w oknie przesuwanym.
// Propagates data in a sliding window fashion.
public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
// The size of the window.
private readonly int m_windowSize;
// The target part of the block.
private readonly ITargetBlock<T> m_target;
// The source part of the block.
private readonly IReceivableSourceBlock<T[]> m_source;
// Constructs a SlidingWindowBlock object.
public SlidingWindowBlock(int windowSize)
{
// Create a queue to hold messages.
var queue = new Queue<T>();
// The source part of the propagator holds arrays of size windowSize
// and propagates data out to any connected targets.
var source = new BufferBlock<T[]>();
// The target part receives data and adds them to the queue.
var target = new ActionBlock<T>(item =>
{
// Add the item to the queue.
queue.Enqueue(item);
// Remove the oldest item when the queue size exceeds the window size.
if (queue.Count > windowSize)
queue.Dequeue();
// Post the data in the queue to the source block when the queue size
// equals the window size.
if (queue.Count == windowSize)
source.Post(queue.ToArray());
});
// When the target is set to the completed state, propagate out any
// remaining data and set the source to the completed state.
target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
m_windowSize = windowSize;
m_target = target;
m_source = source;
}
// Retrieves the size of the window.
public int WindowSize { get { return m_windowSize; } }
#region IReceivableSourceBlock<TOutput> members
// Attempts to synchronously receive an item from the source.
public bool TryReceive(Predicate<T[]> filter, out T[] item)
{
return m_source.TryReceive(filter, out item);
}
// Attempts to remove all available elements from the source into a new
// array that is returned.
public bool TryReceiveAll(out IList<T[]> items)
{
return m_source.TryReceiveAll(out items);
}
#endregion
#region ISourceBlock<TOutput> members
// Links this dataflow block to the provided target.
public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
{
return m_source.LinkTo(target, linkOptions);
}
// Called by a target to reserve a message previously offered by a source
// but not yet consumed by this target.
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
return m_source.ReserveMessage(messageHeader, target);
}
// Called by a target to consume a previously offered message from a source.
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
{
return m_source.ConsumeMessage(messageHeader,
target, out messageConsumed);
}
// Called by a target to release a previously reserved message from a source.
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
m_source.ReleaseReservation(messageHeader, target);
}
#endregion
#region ITargetBlock<TInput> members
// Asynchronously passes a message to the target block, giving the target the
// opportunity to consume the message.
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return m_target.OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
}
#endregion
#region IDataflowBlock members
// Gets a Task that represents the completion of this dataflow block.
public Task Completion { get { return m_source.Completion; } }
// Signals to this target block that it should not accept any more messages,
// nor consume postponed messages.
public void Complete()
{
m_target.Complete();
}
public void Fault(Exception error)
{
m_target.Fault(error);
}
#endregion
}
' Propagates data in a sliding window fashion.
Public Class SlidingWindowBlock(Of T)
Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
' The size of the window.
Private ReadOnly m_windowSize As Integer
' The target part of the block.
Private ReadOnly m_target As ITargetBlock(Of T)
' The source part of the block.
Private ReadOnly m_source As IReceivableSourceBlock(Of T())
' Constructs a SlidingWindowBlock object.
Public Sub New(ByVal windowSize As Integer)
' Create a queue to hold messages.
Dim queue = New Queue(Of T)()
' The source part of the propagator holds arrays of size windowSize
' and propagates data out to any connected targets.
Dim source = New BufferBlock(Of T())()
' The target part receives data and adds them to the queue.
Dim target = New ActionBlock(Of T)(Sub(item)
' Add the item to the queue.
' Remove the oldest item when the queue size exceeds the window size.
' Post the data in the queue to the source block when the queue size
' equals the window size.
queue.Enqueue(item)
If queue.Count > windowSize Then
queue.Dequeue()
End If
If queue.Count = windowSize Then
source.Post(queue.ToArray())
End If
End Sub)
' When the target is set to the completed state, propagate out any
' remaining data and set the source to the completed state.
target.Completion.ContinueWith(Sub()
If queue.Count > 0 AndAlso queue.Count < windowSize Then
source.Post(queue.ToArray())
End If
source.Complete()
End Sub)
m_windowSize = windowSize
m_target = target
m_source = source
End Sub
' Retrieves the size of the window.
Public ReadOnly Property WindowSize() As Integer
Get
Return m_windowSize
End Get
End Property
'#Region "IReceivableSourceBlock<TOutput> members"
' Attempts to synchronously receive an item from the source.
Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
Return m_source.TryReceive(filter, item)
End Function
' Attempts to remove all available elements from the source into a new
' array that is returned.
Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
Return m_source.TryReceiveAll(items)
End Function
'#End Region
#Region "ISourceBlock<TOutput> members"
' Links this dataflow block to the provided target.
Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
Return m_source.LinkTo(target, linkOptions)
End Function
' Called by a target to reserve a message previously offered by a source
' but not yet consumed by this target.
Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
Return m_source.ReserveMessage(messageHeader, target)
End Function
' Called by a target to consume a previously offered message from a source.
Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
End Function
' Called by a target to release a previously reserved message from a source.
Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
m_source.ReleaseReservation(messageHeader, target)
End Sub
#End Region
#Region "ITargetBlock<TInput> members"
' Asynchronously passes a message to the target block, giving the target the
' opportunity to consume the message.
Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
End Function
#End Region
#Region "IDataflowBlock members"
' Gets a Task that represents the completion of this dataflow block.
Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
Get
Return m_source.Completion
End Get
End Property
' Signals to this target block that it should not accept any more messages,
' nor consume postponed messages.
Public Sub Complete() Implements IDataflowBlock.Complete
m_target.Complete()
End Sub
Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
m_target.Fault([error])
End Sub
#End Region
End Class
Kompletny przykład
Poniższy przykład przedstawia kompletny kod dla tego przewodnika. Pokazuje również, jak używać obu bloków okien przesuwanych w metodzie, która zapisuje w bloku, odczytuje z niego i drukuje wyniki w konsoli.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to create a custom dataflow block type.
class Program
{
// Creates a IPropagatorBlock<T, T[]> object propagates data in a
// sliding window fashion.
public static IPropagatorBlock<T, T[]> CreateSlidingWindow<T>(int windowSize)
{
// Create a queue to hold messages.
var queue = new Queue<T>();
// The source part of the propagator holds arrays of size windowSize
// and propagates data out to any connected targets.
var source = new BufferBlock<T[]>();
// The target part receives data and adds them to the queue.
var target = new ActionBlock<T>(item =>
{
// Add the item to the queue.
queue.Enqueue(item);
// Remove the oldest item when the queue size exceeds the window size.
if (queue.Count > windowSize)
queue.Dequeue();
// Post the data in the queue to the source block when the queue size
// equals the window size.
if (queue.Count == windowSize)
source.Post(queue.ToArray());
});
// When the target is set to the completed state, propagate out any
// remaining data and set the source to the completed state.
target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
// Return a IPropagatorBlock<T, T[]> object that encapsulates the
// target and source blocks.
return DataflowBlock.Encapsulate(target, source);
}
// Propagates data in a sliding window fashion.
public class SlidingWindowBlock<T> : IPropagatorBlock<T, T[]>,
IReceivableSourceBlock<T[]>
{
// The size of the window.
private readonly int m_windowSize;
// The target part of the block.
private readonly ITargetBlock<T> m_target;
// The source part of the block.
private readonly IReceivableSourceBlock<T[]> m_source;
// Constructs a SlidingWindowBlock object.
public SlidingWindowBlock(int windowSize)
{
// Create a queue to hold messages.
var queue = new Queue<T>();
// The source part of the propagator holds arrays of size windowSize
// and propagates data out to any connected targets.
var source = new BufferBlock<T[]>();
// The target part receives data and adds them to the queue.
var target = new ActionBlock<T>(item =>
{
// Add the item to the queue.
queue.Enqueue(item);
// Remove the oldest item when the queue size exceeds the window size.
if (queue.Count > windowSize)
queue.Dequeue();
// Post the data in the queue to the source block when the queue size
// equals the window size.
if (queue.Count == windowSize)
source.Post(queue.ToArray());
});
// When the target is set to the completed state, propagate out any
// remaining data and set the source to the completed state.
target.Completion.ContinueWith(delegate
{
if (queue.Count > 0 && queue.Count < windowSize)
source.Post(queue.ToArray());
source.Complete();
});
m_windowSize = windowSize;
m_target = target;
m_source = source;
}
// Retrieves the size of the window.
public int WindowSize { get { return m_windowSize; } }
#region IReceivableSourceBlock<TOutput> members
// Attempts to synchronously receive an item from the source.
public bool TryReceive(Predicate<T[]> filter, out T[] item)
{
return m_source.TryReceive(filter, out item);
}
// Attempts to remove all available elements from the source into a new
// array that is returned.
public bool TryReceiveAll(out IList<T[]> items)
{
return m_source.TryReceiveAll(out items);
}
#endregion
#region ISourceBlock<TOutput> members
// Links this dataflow block to the provided target.
public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)
{
return m_source.LinkTo(target, linkOptions);
}
// Called by a target to reserve a message previously offered by a source
// but not yet consumed by this target.
bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
return m_source.ReserveMessage(messageHeader, target);
}
// Called by a target to consume a previously offered message from a source.
T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target, out bool messageConsumed)
{
return m_source.ConsumeMessage(messageHeader,
target, out messageConsumed);
}
// Called by a target to release a previously reserved message from a source.
void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
ITargetBlock<T[]> target)
{
m_source.ReleaseReservation(messageHeader, target);
}
#endregion
#region ITargetBlock<TInput> members
// Asynchronously passes a message to the target block, giving the target the
// opportunity to consume the message.
DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader,
T messageValue, ISourceBlock<T> source, bool consumeToAccept)
{
return m_target.OfferMessage(messageHeader,
messageValue, source, consumeToAccept);
}
#endregion
#region IDataflowBlock members
// Gets a Task that represents the completion of this dataflow block.
public Task Completion { get { return m_source.Completion; } }
// Signals to this target block that it should not accept any more messages,
// nor consume postponed messages.
public void Complete()
{
m_target.Complete();
}
public void Fault(Exception error)
{
m_target.Fault(error);
}
#endregion
}
// Demonstrates usage of the sliding window block by sending the provided
// values to the provided propagator block and printing the output of
// that block to the console.
static void DemonstrateSlidingWindow<T>(IPropagatorBlock<T, T[]> slidingWindow,
IEnumerable<T> values)
{
// Create an action block that prints arrays of data to the console.
string windowComma = string.Empty;
var printWindow = new ActionBlock<T[]>(window =>
{
Console.Write(windowComma);
Console.Write("{");
string comma = string.Empty;
foreach (T item in window)
{
Console.Write(comma);
Console.Write(item);
comma = ",";
}
Console.Write("}");
windowComma = ", ";
});
// Link the printer block to the sliding window block.
slidingWindow.LinkTo(printWindow);
// Set the printer block to the completed state when the sliding window
// block completes.
slidingWindow.Completion.ContinueWith(delegate { printWindow.Complete(); });
// Print an additional newline to the console when the printer block completes.
var completion = printWindow.Completion.ContinueWith(delegate { Console.WriteLine(); });
// Post the provided values to the sliding window block and then wait
// for the sliding window block to complete.
foreach (T value in values)
{
slidingWindow.Post(value);
}
slidingWindow.Complete();
// Wait for the printer to complete and perform its final action.
completion.Wait();
}
static void Main(string[] args)
{
Console.Write("Using the DataflowBlockExtensions.Encapsulate method ");
Console.WriteLine("(T=int, windowSize=3):");
DemonstrateSlidingWindow(CreateSlidingWindow<int>(3), Enumerable.Range(0, 10));
Console.WriteLine();
var slidingWindow = new SlidingWindowBlock<char>(4);
Console.Write("Using SlidingWindowBlock<T> ");
Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize);
DemonstrateSlidingWindow(slidingWindow, from n in Enumerable.Range(65, 10)
select (char)n);
}
}
/* Output:
Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}
Using SlidingWindowBlock<T> (T=char, windowSize=4):
{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}
*/
Imports System.Collections.Generic
Imports System.Linq
Imports System.Threading.Tasks
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to create a custom dataflow block type.
Friend Class Program
' Creates a IPropagatorBlock<T, T[]> object propagates data in a
' sliding window fashion.
Public Shared Function CreateSlidingWindow(Of T)(ByVal windowSize As Integer) As IPropagatorBlock(Of T, T())
' Create a queue to hold messages.
Dim queue = New Queue(Of T)()
' The source part of the propagator holds arrays of size windowSize
' and propagates data out to any connected targets.
Dim source = New BufferBlock(Of T())()
' The target part receives data and adds them to the queue.
Dim target = New ActionBlock(Of T)(Sub(item)
' Add the item to the queue.
' Remove the oldest item when the queue size exceeds the window size.
' Post the data in the queue to the source block when the queue size
' equals the window size.
queue.Enqueue(item)
If queue.Count > windowSize Then
queue.Dequeue()
End If
If queue.Count = windowSize Then
source.Post(queue.ToArray())
End If
End Sub)
' When the target is set to the completed state, propagate out any
' remaining data and set the source to the completed state.
target.Completion.ContinueWith(Sub()
If queue.Count > 0 AndAlso queue.Count < windowSize Then
source.Post(queue.ToArray())
End If
source.Complete()
End Sub)
' Return a IPropagatorBlock<T, T[]> object that encapsulates the
' target and source blocks.
Return DataflowBlock.Encapsulate(target, source)
End Function
' Propagates data in a sliding window fashion.
Public Class SlidingWindowBlock(Of T)
Implements IPropagatorBlock(Of T, T()), IReceivableSourceBlock(Of T())
' The size of the window.
Private ReadOnly m_windowSize As Integer
' The target part of the block.
Private ReadOnly m_target As ITargetBlock(Of T)
' The source part of the block.
Private ReadOnly m_source As IReceivableSourceBlock(Of T())
' Constructs a SlidingWindowBlock object.
Public Sub New(ByVal windowSize As Integer)
' Create a queue to hold messages.
Dim queue = New Queue(Of T)()
' The source part of the propagator holds arrays of size windowSize
' and propagates data out to any connected targets.
Dim source = New BufferBlock(Of T())()
' The target part receives data and adds them to the queue.
Dim target = New ActionBlock(Of T)(Sub(item)
' Add the item to the queue.
' Remove the oldest item when the queue size exceeds the window size.
' Post the data in the queue to the source block when the queue size
' equals the window size.
queue.Enqueue(item)
If queue.Count > windowSize Then
queue.Dequeue()
End If
If queue.Count = windowSize Then
source.Post(queue.ToArray())
End If
End Sub)
' When the target is set to the completed state, propagate out any
' remaining data and set the source to the completed state.
target.Completion.ContinueWith(Sub()
If queue.Count > 0 AndAlso queue.Count < windowSize Then
source.Post(queue.ToArray())
End If
source.Complete()
End Sub)
m_windowSize = windowSize
m_target = target
m_source = source
End Sub
' Retrieves the size of the window.
Public ReadOnly Property WindowSize() As Integer
Get
Return m_windowSize
End Get
End Property
'#Region "IReceivableSourceBlock<TOutput> members"
' Attempts to synchronously receive an item from the source.
Public Function TryReceive(ByVal filter As Predicate(Of T()), <System.Runtime.InteropServices.Out()> ByRef item() As T) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceive
Return m_source.TryReceive(filter, item)
End Function
' Attempts to remove all available elements from the source into a new
' array that is returned.
Public Function TryReceiveAll(<System.Runtime.InteropServices.Out()> ByRef items As IList(Of T())) As Boolean Implements IReceivableSourceBlock(Of T()).TryReceiveAll
Return m_source.TryReceiveAll(items)
End Function
'#End Region
#Region "ISourceBlock<TOutput> members"
' Links this dataflow block to the provided target.
Public Function LinkTo(ByVal target As ITargetBlock(Of T()), ByVal linkOptions As DataflowLinkOptions) As IDisposable Implements ISourceBlock(Of T()).LinkTo
Return m_source.LinkTo(target, linkOptions)
End Function
' Called by a target to reserve a message previously offered by a source
' but not yet consumed by this target.
Private Function ReserveMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) As Boolean Implements ISourceBlock(Of T()).ReserveMessage
Return m_source.ReserveMessage(messageHeader, target)
End Function
' Called by a target to consume a previously offered message from a source.
Private Function ConsumeMessage(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T()), ByRef messageConsumed As Boolean) As T() Implements ISourceBlock(Of T()).ConsumeMessage
Return m_source.ConsumeMessage(messageHeader, target, messageConsumed)
End Function
' Called by a target to release a previously reserved message from a source.
Private Sub ReleaseReservation(ByVal messageHeader As DataflowMessageHeader, ByVal target As ITargetBlock(Of T())) Implements ISourceBlock(Of T()).ReleaseReservation
m_source.ReleaseReservation(messageHeader, target)
End Sub
#End Region
#Region "ITargetBlock<TInput> members"
' Asynchronously passes a message to the target block, giving the target the
' opportunity to consume the message.
Private Function OfferMessage(ByVal messageHeader As DataflowMessageHeader, ByVal messageValue As T, ByVal source As ISourceBlock(Of T), ByVal consumeToAccept As Boolean) As DataflowMessageStatus Implements ITargetBlock(Of T).OfferMessage
Return m_target.OfferMessage(messageHeader, messageValue, source, consumeToAccept)
End Function
#End Region
#Region "IDataflowBlock members"
' Gets a Task that represents the completion of this dataflow block.
Public ReadOnly Property Completion() As Task Implements IDataflowBlock.Completion
Get
Return m_source.Completion
End Get
End Property
' Signals to this target block that it should not accept any more messages,
' nor consume postponed messages.
Public Sub Complete() Implements IDataflowBlock.Complete
m_target.Complete()
End Sub
Public Sub Fault(ByVal [error] As Exception) Implements IDataflowBlock.Fault
m_target.Fault([error])
End Sub
#End Region
End Class
' Demonstrates usage of the sliding window block by sending the provided
' values to the provided propagator block and printing the output of
' that block to the console.
Private Shared Sub DemonstrateSlidingWindow(Of T)(ByVal slidingWindow As IPropagatorBlock(Of T, T()), ByVal values As IEnumerable(Of T))
' Create an action block that prints arrays of data to the console.
Dim windowComma As String = String.Empty
Dim printWindow = New ActionBlock(Of T())(Sub(window)
Console.Write(windowComma)
Console.Write("{")
Dim comma As String = String.Empty
For Each item As T In window
Console.Write(comma)
Console.Write(item)
comma = ","
Next item
Console.Write("}")
windowComma = ", "
End Sub)
' Link the printer block to the sliding window block.
slidingWindow.LinkTo(printWindow)
' Set the printer block to the completed state when the sliding window
' block completes.
slidingWindow.Completion.ContinueWith(Sub() printWindow.Complete())
' Print an additional newline to the console when the printer block completes.
Dim completion = printWindow.Completion.ContinueWith(Sub() Console.WriteLine())
' Post the provided values to the sliding window block and then wait
' for the sliding window block to complete.
For Each value As T In values
slidingWindow.Post(value)
Next value
slidingWindow.Complete()
' Wait for the printer to complete and perform its final action.
completion.Wait()
End Sub
Shared Sub Main(ByVal args() As String)
Console.Write("Using the DataflowBlockExtensions.Encapsulate method ")
Console.WriteLine("(T=int, windowSize=3):")
DemonstrateSlidingWindow(CreateSlidingWindow(Of Integer)(3), Enumerable.Range(0, 10))
Console.WriteLine()
Dim slidingWindow = New SlidingWindowBlock(Of Char)(4)
Console.Write("Using SlidingWindowBlock<T> ")
Console.WriteLine("(T=char, windowSize={0}):", slidingWindow.WindowSize)
DemonstrateSlidingWindow(slidingWindow, _
From n In Enumerable.Range(65, 10) _
Select ChrW(n))
End Sub
End Class
' Output:
'Using the DataflowBlockExtensions.Encapsulate method (T=int, windowSize=3):
'{0,1,2}, {1,2,3}, {2,3,4}, {3,4,5}, {4,5,6}, {5,6,7}, {6,7,8}, {7,8,9}
'
'Using SlidingWindowBlock<T> (T=char, windowSize=4):
'{A,B,C,D}, {B,C,D,E}, {C,D,E,F}, {D,E,F,G}, {E,F,G,H}, {F,G,H,I}, {G,H,I,J}
'