Rx on the server, part 5 of n: Observable ASP.NET
(Reposted due to layout issues)
Previously in these series we saw:
- Reading a stream asynchronously
- Reading strings asynchronously, line by line
- Writing an observable to a stream
- Buffering output to a stream
All previous posts dealt with interacting between observable and IO Streams. Next in these series, we’re going to take a look at a place we could use these operations in a real server appication: ASP.NET.
IHttpHandler
ASP.NET provides a lot of functionality to make building webpages easy. This functionality can be grouped into different layers. At the bottom layer lies the technology that we’re going to focus in this post:
IHttpHandler.
IHttpHandler is the base interface that is used to expose request to the webserver to ASP.NET. It comes in two flavors: IHttpHandler and IHttpAsyncHandler.
The reason for having the second version is exactly as described in the beginning of these series: scaling a webserver doesn’t work when blocking a whole thread for a single response.
In most webserver scenarios, a request will face many delays that are not processor bound, e.g.:
- Reading a file from (remote) disk
- Accessing a database
As such, implementing IHttpAsyncHandler is often the best way to go.
Unfortunately, implementations of this interface can get very nasty. The interface itself follows the begin/end invoke pattern and when making multiple asynchronous calls as described above, coordinating these calls can get a nightmare.
Let’s see if we can improve on this pattern.
ObservableHttpHandler
At heart of this interface is the following set of operations:
- For each request send to a specified URL of the webserver, a method is called that will be responsible for processing this request.
- This method will receive an HttpContext object. This object provides information on the request and allows the method to send a response.
- The method can spin up any amount of asynchronous work to process the request.
- Once all asynchronous work is done and the response is sent, the method needs to signal it is done processing the request.
In the Rx world, this can be modeled as follows:
public IObservable<Unit> ProcessRequestAsObservable(HttpContext context)
As ASP.NET doesn’t know about this pattern, we’ll have to convert between the Asynchronous pattern it knows about (IHttpAsyncHandler) and the pattern we prefer.
We’ll create an abstract base class that will do this work for us.
First, we’ll implement the IHttpAsyncHandler interface. We’ll do this explicitly so these methods don’t show up in our public API surface.
public abstract class ObservableHttpHandler : IHttpAsyncHandler
{
public abstract IObservable<Unit> ProcessRequestAsObservable(HttpContext context);
IAsyncResult IHttpAsyncHandler.BeginProcessRequest(HttpContext context,
AsyncCallback cb, object extraData)
{
...
}
void IHttpAsyncHandler.EndProcessRequest(IAsyncResult result)
{
...
}
bool IHttpHandler.IsReusable
{
get { ... }
}
void IHttpHandler.ProcessRequest(HttpContext context)
{
...
}
}
Let’s see how to implement these methods. We’ll start with the methods that IHttpAsyncHandler inherits from IHttpHandler:
IsReusable: return true as we don’t plan to maintain state in this class.
ProcessRequest: the method that is called if the request has to be processed synchronously. We will forward to the asynchronous version and block until the work has completed:
void IHttpHandler.ProcessRequest(HttpContext context)
{
var gate = new ManualResetEvent(false);
AsyncCallback cb = (_ => gate.Set());
((IHttpAsyncHandler)this).BeginProcessRequest(context, cb, null);
gate.WaitOne();
}
AsyncResult
As the BeginProcessRequest method follows the begin/endinvoke pattern, we’ll have to return something that implements the IAsyncResult interface. So let’s start off with a simple implementation of this interface:
private class SimpleAsyncResult : IAsyncResult
{
private readonly ManualResetEvent asyncWaitHandler;
private Exception exception;
public SimpleAsyncResult(object asyncState)
{
AsyncState = asyncState;
asyncWaitHandler = new ManualResetEvent(false);
}
public object AsyncState
{
get; private set;
}
public WaitHandle AsyncWaitHandle
{
get { return asyncWaitHandler; }
}
public bool CompletedSynchronously
{
get { return false; }
}
public bool IsCompleted
{
get;
private set;
}
internal void Success()
{
Complete();
}
internal void Fail(Exception exception)
{
this.exception = exception;
Complete();
}
private void Complete()
{
IsCompleted = true;
asyncWaitHandler.Set();
}
internal void CheckResult()
{
if (exception != null)
throw exception;
}
}
SimpleAsyncResult will allow us to communicate to ASP.NET that the request has finished processing as well as pass along any exception that happened along the way.
UPDATE: After posting this post, my coworker Stephen Toub from the TPL team pointed out, that instead of implementing SimpleAsyncResult, we could have reused Task as it also implements IAsyncResult.
I’ll leave it up as an exercise for the reader to do this.
BeginProcessRequest
The first thing we’ll do in BeginProcessRequest is to create an instance of this new class:
var asyncResult = new SimpleAsyncResult(extraData);
Next we need to know how to kick off the computation. We’ll call the ProcessRequestAsObservable method and store the observable:
var observable = ProcessRequestAsObservable(context);
Now it is time to kick off the computation by subscribing to the observable. As the observable only contains Unit values, we only care about completion either success or failure:
observable.Subscribe(
_ => { },
e=>
{
asyncResult.Fail(e);
cb(asyncResult);
},
() =>
{
asyncResult.Success();
cb(asyncResult);
});
In both cases, we signal completion to the SimpleAsyncResult and call the callback that ASP.NET provided us. In the case of failure, we do provide the SimpleAsyncResult with the exception that occurred.
Finally we’ll return the SimpleAsyncResult object back to ASP.NET:
return asyncResult;
EndProcessRequest
As the exception is now stuck as instance state in the SimpleAsyncResult instance, we’ll need to provide ASP.NET with a way to get the exception out of there. This is done by implementing the EndProcessRequest method:
((SimpleAsyncResult) result).CheckResult();
Complete Implementation
With that the complete implementation of ObservableHttpHandler is:
public abstract class ObservableHttpHandler : IHttpAsyncHandler
{
public abstract IObservable<Unit> ProcessRequestAsObservable(HttpContext context);
IAsyncResult IHttpAsyncHandler.BeginProcessRequest(HttpContext context,
AsyncCallback cb, object extraData)
{
var asyncResult = new SimpleAsyncResult(extraData);
var observable = ProcessRequestAsObservable(context);
observable.Subscribe(
_ => { },
e =>
{
asyncResult.Fail(e);
cb(asyncResult);
},
() =>
{
asyncResult.Success();
cb(asyncResult);
});
return asyncResult;
}
void IHttpAsyncHandler.EndProcessRequest(IAsyncResult result)
{
((SimpleAsyncResult)result).CheckResult();
}
bool IHttpHandler.IsReusable
{
get { return true; }
}
void IHttpHandler.ProcessRequest(HttpContext context)
{
var gate = new ManualResetEvent(false);
AsyncCallback cb = (_ => gate.Set());
((IHttpAsyncHandler)this).BeginProcessequest(context, cb, null);
gate.WaitOne();
}
private class SimpleAsyncResult : IAsyncResult
{
private readonly ManualResetEvent asyncWaitHandler;
private Exception exception;
public SimpleAsyncResult(object asyncState)
{
AsyncState = asyncState;
asyncWaitHandler = new ManualResetEvent(false);
}
public object AsyncState
{
get;
private set;
}
public WaitHandle AsyncWaitHandle
{
get { return asyncWaitHandler; }
}
public bool CompletedSynchronously
{
get { return false; }
}
public bool IsCompleted
{
get;
private set;
}
internal void Success()
{
Complete();
}
internal void Fail(Exception exception)
{
this.exception = exception;
Complete();
}
private void Complete()
{
IsCompleted = true;
asyncWaitHandler.Set();
}
internal void CheckResult()
{
if (exception != null)
throw exception;
}
}
}
Usage
Now that the implementation is done, it is time to take a look how we would use this new base class:
Imagine an HttpHandler that will asynchronously read in a big movie file, e.g. in 1mb chunks and sends each chunk to the client asynchronously:
public class BigVideoHandler : ObservableHttpHandler
{
public override IObservable<System.Unit> ProcessRequestAsObservable(HttpContext context)
{
var fs = new FileStream(@"d:\videos\video1.wmv", FileMode.Open,
FileAccess.Read, FileShare.Read, 2 << 19, true);
context.Response.ContentType = "video/x-ms-wmv";
return fs.AsyncRead(2 << 19).WriteToStream(context.Response.OutputStream);
}
}
NOTE: To use this HTTP Handler, it needs to be registered with the webserver. As this is different per webserver, please read this MSDN article on how to do so.
Even though many asynchronous operations are going on here, the code still looks very concise. The code inside ProcessRequestAsObservable can use any IObservable implementation and any of the Rx operators to orchestrate many asynchronous operations.
What’s next
The HttpHandler pattern is very powerful; it can be used to represent nearly any kind of HTTP request. As it is very low on the stack though, you’ll have to do a lot of manual work. Next in these series we’ll take a look how Rx can help out asynchronous programming in ASP.NET MVC.
Comments
Anonymous
September 15, 2010
Great article! I’d second the idea of using Task rather than implementing IAsyncResult. This APM pattern is notoriously difficult to get right. In this case you are always going to create an EventWaitHandle & then leave it for the finalizer to clean up. Task on the other hand, creates this handle in a lazy fashion and as you don’t use it, you won’t need to clean it up.Anonymous
September 15, 2010
In your ProcessRequest, shouldn't you be disposing the ManualResetEvent object at the end of the method? Great series, by the way.Anonymous
September 16, 2010
EndProcessRequest should be blocking (according to MSDN pattern for Begin/End* async methods) until completed. This would also allow you to implement ProcessRequest more easily with simple calls to BeginProcessRequest followed by ProcessRequest. Great article & series!Anonymous
September 17, 2010
I would really like to see an implementation using System.Threading.Task, not quite sure how it would work.Anonymous
October 19, 2010
Looking forward to your next post in the series with ASP.NET MVC.. soon?