Freigeben über


How to convert a Synchronous State Machine to Asynchronous

In my previous post, we looked at a basic state machine implementation for SCTP. In my state machine, the event Abort was implemented with a blocking use of the BeginSend/EndSend. The code is repeated below for you to look at:

internal class ActiveState : AssociationState

{

    public override AssociationState Abort(Association association)

    {

        var chunk = new AbortChunk();

        association.PacketBuilder.EndSend(

                association.PacketBuilder.BeginSend(

                   new Chunk[] { chunk }, null, null));

        return AssociationStates.Closed;

    }

}

In this post, we’ll take a look how to remove this problem and then we’ll take a break from state machines to look at some other interesting topics for a while.

 

Asynchronous event processing

 

The best way to show the fix is to work backward from the problem code and see what changes are required.  The first thing to do is to change the state method signatures to support asynchronous processing.  This means the method to process Abort now has the following signature:

public virtual IAsyncResult BeginAbort(

    Association association,

    AsyncCallback asyncCallback,

    object state);

public virtual AssociationState EndAbort(IAsyncResult result);

Recognize this pattern? It is time to start using your IAsyncResult implementation again. Since the default processing of an event is to do nothing, we need an implementation that asynchronously does nothing to use as the default behavior for processing of each event. To reuse code, I added BeginDoNothing/EndDoNothing as follows:

private IAsyncResult BeginDoNothing(

    AsyncCallback asyncCallback,

    object state,

    string operationId)

{

    var result = new SimpleApplyEventAsyncResult(

            () => { return default(AssociationState); },

            asyncCallback,

            state,

            this,

            operationId);

    // The delegate effectively does nothing.

    result.Process();

    return result;

}

private AssociationState EndDoNothing(

    IAsyncResult result,

    string operationId)

{

    var state = AsyncResult<AssociationState>.End(result, this, operationId);

    return state;

}

This makes default processing for each event simple:

   

public virtual IAsyncResult BeginAbort(

    Association association,

    AsyncCallback asyncCallback,

    object state)

{

    return this.BeginDoNothing(asyncCallback, state, "BeginAbort");

}

public virtual AssociationState EndAbort(IAsyncResult result)

{

    return this.EndDoNothing(result, "BeginAbort");

}

SimpleApplyEventAsyncResult just takes a delegate that cannot fail, and returns the next state or null to indicate what to do for the next state.  This makes it easy to convert much of the code that was synchronous.

I changed all the event processing signatures except for Enter/Exit for the time being. They require special consideration.

For Abort in the Active state, a custom IAsyncResult implementation is required:

public override IAsyncResult BeginAbort(

    Association association,

    AsyncCallback asyncCallback,

    object state)

{

    var result = new AbortEventAsyncResult(

        association,

        asyncCallback,

        state,

        this,

        "BeginAbort");

    result.Process();

    return result;

}

public override AssociationState EndAbort(IAsyncResult result)

{

    var state = AsyncResult<AssociationState>.End(result, this, "BeginAbort");

    return state;

}

The implementation of AbortEventAsyncResult sends an abort chunk asynchronously before completing:

internal override void Process()

{

    var chunk = new AbortChunk();

    m_association.PacketBuilder.BeginSend(

        new Chunk[] { chunk }, this.SendCompleted, null);

}

private void SendCompleted(IAsyncResult result)

{

    Exception error = null;

    try

    {

        m_association.PacketBuilder.EndSend(result);

    }

    catch (SocketException exception)

    {

        error = exception;

    }

    finally

    {

        if (error == null)

        {

            this.SetResult(AssociationStates.Closed);

        }

        else

        {

            // Don't change the state if we cannot send.

            this.SetResult(default(AssociationState));

        }

        // association.CleanUp()

        this.Complete(null /*exception*/);

    }

}

I now have the flexibility to decide whether to report the error on failure or not, and make a different state transition than success. I'll come back to this as there are interesting design decisions to think about.

In ShutdownReceived state there was special code to ensure messages were not sent in that state:

public override AssociationState SendMessage(Association association, byte[] message)

{

    throw new InvalidOperationException();

}

This behavior can be preserved by throwing in the BeginSendMessage method.

The rest of the implementations can either use SimpleApplyEventAsyncResult or require their own IAsyncResult implementation. This is not hard to change, although it is quite time consuming. It is unfortunate that there is no tooling available to help make this change more quickly.

Fixing the State Machine

We updated the processing of events to be asynchronous, but the rest of the related code has not been fixed yet. There are three things to really look at:

  • We have conveniently ignored the problem of sequencing events from the application and events from the network up until now. There is no locking or queue in place for this. Now that it is asynchronous, events could arrive and be processed in parallel causing unpredictable behavior.
  • The SetState method which drove the state machine worked with synchronous methods. We need to account for asynchronous methods.
  • Events from the network are processed before retrieving the next packet. If adding a queue, we don't want to introduce a situation where we pull a lot of data from the socket and store it in memory. This can be a security issue where an attacker could take out our application by sending a lot of data.

Sequencing Event Processing

To sequence the event processing, we will need a queue. The queue will need to know about processing asynchronous tasks and not process the next item until the previous one has completed. This is shown below:

The only thing common about all our event processing methods is that they are asynchronous, and they return the next state if a change is necessary or null if no change is necessary. The queue really shouldn't know anything specific about each event being processed. It just needs to know:

  • How to start the processing
  • When to start the next item.

One way to achieve this is by creating an interface IWorkitem, and creating a class per event derived from IWorkitem and put them in the queue for processing. One down side to this in addition to adding a method to the state class for each event, we now have to add a class that implements IWorkitem for each event we handle. Another down side is that each event source has to know how to create work items and queue them.

Another possible way is to create a custom method BeginApplyXXX at the event sources to ensure that the request goes through the queue, and the event processing is then called. These methods could use a common base IAsyncResult that knows how to deal with the queue before calling the custom code for each event being processed. This still has the down side that you have to create a custom IAsyncResult implementation at the event source for each event.

The approach I chose, is to pass the customized parts of the event processing (how to start the task, how to call the end method) into a single BeginApplyEvent method. Since I only have a single BeginApplyEvent method I should only need one IAsyncResult implementation.

Here is an example call to the BeginApplyEvent method I created. This hopefully will help you understand the explanations below of how the BeginXXX and EndXXX methods are specified:

m_association.BeginApplyEvent(

    (callback, currentState) => /* Run the BeginXXX Method */

    {

        currentState.BeginReceive_CookieEcho(

            m_association,

            m_common,

            chunk as CookieEchoChunk,

            callback,

            null /*state*/);

    },

    currentState => { return currentState.EndReceive_CookieEcho; },

                      /* GetEndXXXMethod */

    this.ProcessChunkCompleted,

    null);

Specifying EndXXX Method

The customization required for the end method is only the name of the EndXXX method on the state to call. They all have the same signature:

public override AssociationState EndXXX(IAsyncResult result);

This means one argument, the end method you want to call (i.e. Func<IAsyncResult, AssociationState>) could be passed to BeginApplyEvent. The problem though is that the method needs to be called on the current state. You don't know the current state when you call BeginApplyEvent because there may be other events to process before the one you are currently creating makes it to the front of the processing queue. For this reason, you can write code to return the EndXXX method to use when your event is first in the queue and you are told what the current state is. The method signature would look like:

Func<IAsyncResult, AssociationState> GetEndMethod(AssociationState currentState);

  

Specifying BeginXXX Method

Passing code to invoke the BeginXXX method is a little harder. The BeginXXX method takes a variable number of arguments. If you are not wrapping them in a class yourself, the other way is to let the compiler do it for you. The compiler will do it if you use delegates.

To invoke the right BeginXXX method, we need to know what the current state is when the event is first in the queue. This means we need to have the currentState passed to us in the same way as the EndXXX method.

Because the BeginApplyEvent operation is running our BeginXXX event processing method as part of its sequence of tasks, it needs to tell us what AsyncCallback to call when the event processing is complete. After processing is complete, the BeginApplyEvent operation needs to apply the state returned from our EndXXX method. The callback we are to use for our BeginXXX call is passed to the delegate we supply so we can build our BeginXXX properly. You can see an example delegate passed as an argument repeated here:

(callback, currentState) => /* Run the BeginXXX Method */

{

    currentState.BeginReceive_CookieEcho(

        m_association,

        m_common,

        chunk as CookieEchoChunk,

        callback,

        null /*state*/);

},

 

The delegate has the following signature:

Action<AsyncCallback,AssociationState>

  

Finishing the conversion

Changing all the sync event calls to BeginApplyEvent calls with delegates is a bit of work and takes some time. It takes a few before you understand the pattern.

After making all the BeginApplyEvent call changes, the rest of the code that generates events needs to be updated. Most of it is straight forward, but because a packet can have more than one chunk to process, we had to do a special fix for received packets. The ReceivePacket method had to be changed to BeginReceivePacket, and call BeginApplyEvent for each chunk in the packet. 

What about the queue?

Now that all events are processed by calling BeginApplyEvent, we can put the IAsyncResults from BeginApplyEvent calls in a queue and process them one at a time.  The calling code doesn't have to know there is a queue there at all.

What about the SetState method?

BeginApplyEvent method replaces SetState as the interface for event sources to apply events on the association state machine. We still use the old synchronous SetState method from our BeginApplyEvent implementation to change the state when an EndXXX reports a state change. There are no threading issues anymore because only one event is applied at a time.

Did the queue introduce a security problem?

I don't believe we introduced any additional issues by adding our queue. The BeginReceivePacket processes the packet entirely by calling BeginApplyEvent until all chunks have been run through the state machine. Only when finished does it request more data from the network. This is the same behavior as we had with the synchronous version ReceivePacket.

To report exceptions or not from state machine

This is an interesting problem to think about. Basically, the question is, should our EndXXX methods ever throw?

Note if an EndXXX event threw then that implies the EndApplyEvent that wrapped the event processing would also throw.

One way to think about it is that if applying an event fails you would either ignore it and stay in the same state, or handle the exception and suggest moving to a different state. In either case, reporting the exception in the EndXXX method would not make sense because the error should have been handled already. This is especially true for events coming from the network.

Another consideration is for events coming from the API.  For BeginConnect, BeginShutdown, and BeginAbort API calls, it would be good to report the error to let the caller know it didn't work. Other than reporting though, there is not much the state machine can do about it.

The BeginSend API is different. Its success or failure doesn't necessarily affect the state of the state machine. When calling a BeginSend API, reporting the failure is important to the application. It could retry, or ignore the failure as needed.  This shows that each event coming from the API needs to be considered individually.

From this quick analysis for calls from the API, it seems logical that EndXXX may need to throw, but only to pass the error information back to the application. This could be in addition to making any state change decisions on success or failure.

Enter/Exit events

 I intentionally skipped changing Enter/Exit events on the states to be asynchronous. The reason is it is not really clear if that is the right thing to do.

The reasons for doing it, is that we can ensure asynchronous tasks always get executed when entering or leaving a state. The problem though is that we have to decide what happens when the task in the Enter or Exit action fails.  Enter and Exit actions don't change state as currently designed.

It seems if you have real work that could fail, you need to add transitional states to do the Enter and Exit action. If the work fails you can change to a different state than when it succeeds. You can ensure that all transitions pass through the new state before going to the corresponding stable state.

While in these transitional states, what happens to other events that may be coming in?  It seems that you would want to defer them until you get to a stable state where you can replay and thus process them. Fixing the Enter/Exit transitions and deferring events are big topics for other posts.

Summary

I may have burned a few more of your brain cells on this post than usual. In the end though, we did convert a synchronous state machine to an asynchronous state machine and removed the blocking code problem for the Abort event.  To really understand how the code was updated, you need to look at the before and after implementations. This was a lot of work, but there was a pattern or two followed to implement the solution. That means we may be able to create some tools to help us in the future.

As promised, I will take a break from state machines, and take on a few lighter technology exploring topics to let your brain rest.

Series

Start of series previous

20110516_StatePatternAsyncSCTP.zip