Refactoring the Asynchronous State Machine to use TPL
In a previous post, I gave a possible implementation of an asynchronous state machine using the APM asynchronous pattern. There may be other reasons, but the topic was difficult to present clearly in a blog post because of the complexity and volume of code involved. I’m sure not too many people made it to the end of that post.
Why use an asynchronous state machine? I’ve seen a lot of asynchronous code suddenly go synchronous and blocking once it gets to the state machine of the code. It is difficult to implement without introducing threading safety issues and odd program states because the application is effectively in two states on different executing threads. It would be nice to have a easy pattern for this so we can take advantage of the increasing number of cores we have available.
I recently had the opportunity to refactor the code to use the Task Parallel Library, which greatly simplified the implementation. I am using the State Pattern here, where the methods on the class indicate the events the state machine has to handle. The signature of each state is now cleaned up significantly:
internal class AssociationState
{
public virtual void Enter(Association association, IAssociationToLayer notification);
public virtual void Exit(Association association, IAssociationToLayer notification);
#region Network events
public virtual Task<AssociationState> Receive_InitAckAsync(
Association association,
IAssociationToLayer notification,
InitAckChunk chunk,
object state);
public virtual Task<AssociationState> Receive_AbortAsync(
Association association,
AbortChunk chunk,
object state);
public virtual Task<AssociationState> Receive_InitAsync(
Association association,
InitChunk chunk,
object state);
public virtual Task<AssociationState> Receive_CookieEchoAsync(
Association association,
CommonHeader common,
CookieEchoChunk inChunk,
object state);
public virtual Task<AssociationState> Receive_SackAsync(
IPacketBuilder packetBuilder,
SackChunk chunk,
object state);
public virtual Task<AssociationState> Receive_CookieAckAsync(
IAssociationToLayer notification,
object state);
public virtual Task<AssociationState> Receive_ShutdownAsync(
IPacketBuilder packetBuilder,
IAssociationToLayer notification,
ShutdownChunk inChunk,
object state);
public virtual Task<AssociationState> Receive_ShutdownAckAsync(
IPacketBuilder packetBuilder,
IAssociationToLayer notification,
object state);
public virtual Task<AssociationState> Receive_ShutdownCompleteAsync(
IAssociationToLayer notification,
object state);
public virtual Task<AssociationState> Receive_DataAsync(
Association association,
DataChunk chunk,
object state);
#endregion
#region API Events
public virtual Task<AssociationState> AssociateAsync(
Association association,
IAssociationToLayer notification,
IPAddress address,
int peerPort,
AssociateOptions options,
object state);
public virtual Task<AssociationState> AbortAsync(
IPacketBuilder packetBuilder,
object state);
public virtual Task<AssociationState> ShutdownAsync(
object state);
public virtual Task<AssociationState> SendMessageAsync(
Association association,
byte[] message,
object state);
#endregion
#region Internal Events
public virtual Task<AssociationState> DataQueueClearAsync(
Association association,
object state);
#endregion
}
There is much less noise in the interface of the state. As it is a networking API, events can come from the stack, from the application, and from internal activity (such as expiration timers).
Sequencing the events
To operate reliably, the state machine needs to sequence (or at least coordinate) events to ensure that they get processed in the correct state. This is not difficult to accomplish with TPL. Tasks can be scheduled on a TaskScheduler that schedules tasks sequentially. There happens to be one example of a custom TaskScheduler in existence already, the LimitedConcurrencyTaskScheduler available from MSDN. When you set the concurrency level to one, it sequences the tasks.
Implementations of the methods can then specify the TaskScheduler to use as shown below:
internal static class AssociationStates
{
public static LimitedConcurrencyLevelTaskScheduler Scheduler = new LimitedConcurrencyLevelTaskScheduler(1);
// ...
}
public override Task<AssociationState> Receive_CookieEchoAsync(
Association association,
CommonHeader common,
CookieEchoChunk inChunk,
object state)
{
Task<AssociationState> outerTask = Task.Factory.StartNew<AssociationState>(
(state2) =>
{
CookieData cookie = inChunk.CookieData;
var peerEndpoint = new IPEndPoint(IPAddress.Loopback, common.SourcePort);
association.PacketBuilder.SetPeerEndpoint(peerEndpoint);
association.PacketBuilder.ProcessCookieEcho(inChunk, cookie);
association.InitializePacketReceiver(association.PacketBuilder.Mtu, cookie.RemoteTSN);
association.PacketReceiver.ProcessCookieEcho(inChunk, cookie);
// Send cookie Ack
var chunk = new CookieAckChunk();
association.PacketBuilder.SendAsync(
new Chunk[] { chunk }, null/*state*/).Wait();
return AssociationStates.Established;
},
state,
default(CancellationToken),
TaskCreationOptions.None,
AssociationStates.Scheduler);
return outerTask;
}
This works great for sequencing things, but you have to be careful. When you create tasks within a already executing Task it uses the same scheduler as the parent if not explicitly specified. Since the custom task scheduler schedules sequentially, this results in a deadlock unfortunately.
My way to get around the problem was to ensure the new tasks are explicitly routed to the default TaskScheduler. Here is an example from within the PacketBuilder.SendAsync method:
Task.Factory.StartNew(
this.ProcessSendChunks,
default(CancellationToken),
TaskCreationOptions.None,
TaskScheduler.Default);
I might also have been able to avoid the deadlock by modifying the TaskScheduler to allow child tasks to run so they don’t block.
Summary
Refactoring to the Task Parallel Library greatly simplifies asynchronous code and allows you to see the logic of your application more clearly. The TPL will likely be an important part of the solution to make an understandable pattern for asynchronous state machines.