Jaa


Chaining async calls in WCF (or in .NET in general)

Recently I had to implement an asynchronous operation which would in turn call another asynchronous operation (i.e., cascading async operations). For simple cases, when there's no intermediate processing in the middle of the chains (i.e., no new state that needs to be passed around), this is fairly straightforward (or as straightforward as async programming can be):

[ServiceContract]
public interface ITest
{
[OperationContract(AsyncPattern = true)]
    IAsyncResult BeginEchoString(string input, AsyncCallback callback, object userState);
    string EndEchoString(IAsyncResult asyncResult);
}
public class Service : ITest
{
    string EchoStringDoWork(string input)
    {
        return input;
}
    public IAsyncResult BeginEchoString(string input, AsyncCallback callback, object userState)
{
        Func<string, string> func = EchoStringDoWork;
        return func.BeginInvoke(input, callback, userState);
}
    public string EndEchoString(IAsyncResult asyncResult)
{
System.Runtime.Remoting.Messaging.AsyncResult typedAsyncResult =
(System.Runtime.Remoting.Messaging.AsyncResult)asyncResult;
        Func<string, string> func = (Func<string, string>)typedAsyncResult.AsyncDelegate;
        return func.EndInvoke(asyncResult);
}
}

This can be shown in the image below

When I needed to pass around some extra information, however, I found out that it wasn't as easy as before, and since I couldn't bing (or google, yes, I also tried there) any information about that, I decided to post my solution here. I really hope this isn't the "best" way to do it, and that I simply didn't search well enough, as I find it a lot harder than it should be.

Basic Async Programming Model

First some basic rules for the Begin/End async programming model (APM). There's also an event-based model which is a lot simpler than the Begin/End one, but in many cases it's not available, so I'll not discuss it here. A user makes a BeginXXX call into a library, passing a callback (System.AsyncCallback delegate) along with some additional parameters. When the library work is done, it will invoke the callback delegate, notifying the user that the results are available. Finally, the user will call EndXXX to fetch the results from the operation.

Notice that there are cases when one can make synchronous calls to Begin/End operations (either by waiting on the WaitHandle, https://msdn.microsoft.com/en-us/library/ms228962.aspx, or by calling EndXXX before the callback is invoked, https://msdn.microsoft.com/en-us/library/ms228967.aspx), but this defeats the purpose of the APM (be able to do other tasks while the operation is being executed), and in some cases it's not valid to do so (like in Silverlight), so again I'll not get into those cases.

The main (and only) point where a user can pass some state along asynchronous calls is the last parameter of the operation, usually called userState or asyncState, of type System.Object. This state object has to be returned to the user all the time: both in the return value of the BeginXXX call (typed IAsyncResult) and in the IAsyncResult (IAR) parameter passed to the callback specified by the user. If you need some extra information to be passed along the state to the next operation in the chain, however, the code starts getting a lot more complex.

Chaining async calls with additional information

The first problem with the simple approach for additional state: if you don't pass the userState parameter from the user, you can't return the same IAsyncResult received from the call to BeginOp2 in BeginOp1. You'll essentially need to create a new implementation of the IAsyncResult interface to return to the user, so that they can access their AsyncState property from the resulting IAsyncResult object. That's already an extra class needed here.

The second problem is that, since you passed a different userState to BeginOp2, you cannot pass the callback the user supplied, otherwise it would be called with your state, not theirs. And then we need an intermediate callback method to receive the notification that Op2 is done, and only then we can call the user callback, passing their state to them.

Now, when the user receives the callback, they will call EndOp1 passing only IAsyncResult parameter (the same one you passed to them in the call to callback1. On EndOp1, you will need to return the result of the method, so any information necessary to do so must be contained in the IAsyncResult itself (and the "only" place that can be used to store additional information, the AsyncState property, is already taken by the user's userState).

Confusing? I thought so too. Hopefully the diagram below will give a little clarity to the scenario.

My (more complex than I'd like) solution

The first thing that needed to be addressed was the need for a new IAsyncResult implementation. The easiest way to do that was to wrap the IAR returned by BeginOp2 into another IAR which could contain additional information. All properties are delegated to the inner IAR, except the AsyncState, which should be the user provided one. To make managing the APM data easier, I forced the my state to contain both the user callback and the user state, this way I could write a generic IAR implementation and reuse it in different locations. On BeginOp1, I create this new IAR implementation and return it to the user.

The next step is the callback (i.e., "callback2"), which receives the state with all the user information (callback delegate, user state), so I could simply recreate the IAR to send it back to the user callback (i.e., "callback1"). When I receive my callback I already call EndOp2, and store all the result information in my state (another option would be to defer calling EndOp2 until the user calls EndOp1, in which case the state didn't need to store the result information, but it'd need to store the IAR passed to the callback to be able to call EndOp2 at that point - see diagram below for this scenario).

Finally, on the EndOp1 implementation I used the fact that the user must call EndOp1 with the same IAR that is passed to its callback, so I can cast it to my own class and retrieve the state from there. Since the state already had all the information for the result, the job is done.

And here is my implementation of a WCF operation using the async pattern, which calls another async operation (HttpWebRequest.BeginGetResponse). We need the HttpWebRequest instance to be able to call EndGetResponse after the callback is invoked, so I needed all the complexities required by the additional state (this isn't the best example, as HttpWebRequest has a synchronous GetResponse method, but it's simple enough to be used in this post).

 // Chaining async operations
public class BlogPost
{
[ServiceContract]
    public interface ITest
    {
[OperationContract(AsyncPattern = true)]
        IAsyncResult BeginProcess(string text, AsyncCallback callback, object userState);
        string EndProcess(IAsyncResult asyncResult);
}
[ServiceContract(Name = "ITest")]
    public interface ISyncTest
    {
[OperationContract]
        string Process(string text);
}
    public class Service : ITest
    {
        internal class MyState : MyUserStateBase
        {
            public MyState(AsyncCallback userCallback, object userState) : base(userCallback, userState) { }
            public HttpWebRequest req;
            public string input;
            public string GETResponse;
}
        public IAsyncResult BeginProcess(string text, AsyncCallback callback, object userState)
{
            Console.WriteLine("[server] Inside BeginProcess");
            Uri baseAddress = OperationContext.Current.Host.BaseAddresses[0];
            HttpWebRequest req = (HttpWebRequest)HttpWebRequest.Create(baseAddress);
req.Method = "GET";
            MyState myState = new MyState(callback, userState) { input = text, req = req };
            Console.WriteLine("[server] Inside BeginProcess, called HttpWebRequest.BeginGetResponse");
            IAsyncResult asyncResult = req.BeginGetResponse(new AsyncCallback(HttpGetCallback), myState);
            return new MyAsyncResult<MyState>(asyncResult, myState);
}
        void HttpGetCallback(IAsyncResult asyncResult)
{
            Console.WriteLine("[server] Inside HttpGetCallback");
            MyState myState = (MyState)asyncResult.AsyncState;
            HttpWebResponse resp;
            try
            {
resp = (HttpWebResponse)myState.req.EndGetResponse(asyncResult);
}
            catch (WebException e)
{
resp = (HttpWebResponse)e.Response;
}
myState.GETResponse = String.Format("Size of help page = {0}", resp.ContentLength);
            Console.WriteLine("[server] Inside HttpGetCallback, calling the user callback");
myState.UserCallback(new MyAsyncResult<MyState>(asyncResult, myState));
}
        public string EndProcess(IAsyncResult asyncResult)
{
            Console.WriteLine("[server] Inside EndProcess");
            MyAsyncResult<MyState> myAsyncResult = (MyAsyncResult<MyState>)asyncResult;
            MyState myState = myAsyncResult.AdditionalData;
            return String.Format("Original input: {0}; HTTP GET output: {1}", myState.input, myState.GETResponse);
}
}
    internal class MyUserStateBase
    {
        private AsyncCallback userCallback;
        private object userState;
        public MyUserStateBase(AsyncCallback userCallback, object userState)
{
            this.userCallback = userCallback;
            this.userState = userState;
}
        public AsyncCallback UserCallback
{
            get { return this.userCallback; }
}
        public object UserState
{
            get { return this.userState; }
}
}
    internal class MyAsyncResult<T> : IAsyncResult where T : MyUserStateBase
    {
        private T additionalData;
        private IAsyncResult inner;
        public MyAsyncResult(IAsyncResult inner, T additionalData)
{
             this.inner = inner;
            this.additionalData = additionalData;
}
        public IAsyncResult Inner
{
            get { return this.inner; }
}
        public T AdditionalData
{
            get { return this.additionalData; }
}
        #region IAsyncResult Members
        public object AsyncState
{
            get { return this.additionalData.UserState; }
}
        public WaitHandle AsyncWaitHandle
{
            get { return inner.AsyncWaitHandle; }
}
        public bool CompletedSynchronously
{
            get { return inner.CompletedSynchronously; }
}
        public bool IsCompleted
{
            get { return inner.IsCompleted; }
}
        #endregion
    }
    public static void Test()
{
        string baseAddress = "https://" + Environment.MachineName + ":8000/Service";
        ServiceHost host = new ServiceHost(typeof(Service), new Uri(baseAddress));
host.AddServiceEndpoint(typeof(ITest), new BasicHttpBinding(), "");
host.Open();
        Console.WriteLine("Host opened");

        ChannelFactory<ISyncTest> factory = new ChannelFactory<ISyncTest>(new BasicHttpBinding(), new EndpointAddress(baseAddress));
        ISyncTest proxy = factory.CreateChannel();
        Console.WriteLine(proxy.Process("Hello"));

((IClientChannel)proxy).Close();
factory.Close();
        Console.Write("Press ENTER to close the host");
        Console.ReadLine();
host.Close();
}
}

Comments