Compartilhar via


Rx on the server, part 1 of n: Asynchronous System.IO.Stream reading

As it’s been a long time since I blogged, I figured I owe it to people subscribed to my blog to start a nice long series with regular posts. (Also I can’t stand seeing Matthew have all the fun :)).

There have been many samples showing Rx on the client. In these samples, Rx is often used for UI operations (such as Drag Drop) and/or network requests (e.g. dictionary suggest). What I haven’t seen much of is samples of using Rx on the server. Well it’s time to change that!

These series will focus on scenarios where you could use Rx on the server. I’ll start out with a couple of posts on reading and writing asynchronously from and to System.IO.Stream objects. After that I plan to discuss using Rx for Asynchronous ASP.NET programming, followed by anything else that might come up..

This will be a pretty advanced series, probably at the 300 level. If I go too fast, feel free to post comments or ask questions on the Rx forum

So with that, let’s jump right into part 1:

Asynchronous System.IO.Stream reading

First of all, why do we care about asynchronous reading? The Wikipedia page on Asynchonous I/O gives a good description on why: We don’t want to block on slow IO operations. This is something that can be of importance on a client application as well, but in a server application it quickly becomes critical. With hundreds of clients, servers often cannot afford to have a thread blocked on IO. 

So that’s where Asynchronous I/O comes in. The .NET BCL exposes this pattern through the BeginRead/EndRead & BeginWrite/EndWrite methods. While this provides a more optimal way of dealing with slow I/O from a runtime perspective, it isn’t really an optimal way of writing your code. As each separate asynchronous operation requires a Begin/End pair as well as a set of delegates, you can quickly get into spaghetti code. So let’s take a look if we can improve on this pattern with Rx.

In order to give Rx access to the Asynchronous I/O operations, we’re first going to convert the BeginRead/EndRead pattern into a Rx pattern:

var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead, stream.EndRead);

What does this mean? FromAsyncPattern takes the something that follows the Begin* End* pattern and converts it into a Func<arguments, IObservable<returnvalue>>. When somebody calls this Func<>, the following happens:

  • the arguments passed in to the returned Func<> are passed into to the Begin* method.
  • Additionally the right steps are taken to hookup the Begin* to a callback that is called once the asynchronous action is completed.
  • The func returns an IObservable<returnvalue>. The concrete implementation of this IObservable<returnvalue> is an AsyncSubject<returnvalue>.
  • Once the callback gets called, the return value of the asynchronous computation is retrieved by calling End*. This value is then sent to the AsyncSubject by calling OnNext & OnCompleted.

NOTE: The fact that the concrete implementation of the IObservable returned here is an AsyncSubject, means that this observable is ‘HOT’. The reason for this is that the async operation might have completed before you’re able to subscribe to the IObservable that you get back. Without an AsyncSubject, there could be a race condition in which you would miss the result.

In the case of stream.BeginRead & stream.EndRead, the type arguments to FromAsyncPattern are: the first three arguments to BeginRead: array, offset, numBytes, followed by the return value of EndRead: The number of bytes read from the stream, between 0 and the number of bytes you requested.

For this case, the return value of FromAsyncPattern will have the following signature:
Func<byte[], int,int, IObservable<int>>

So let’s see how we can use this new asyncRead function: 

// bufferSize > 64k as per BeginRead spec
// https://msdn.microsoft.com/en-us/library/zxt5ahzw.aspx
var bufferSize = 2 << 16;

// constructor of FileStream that enables asynchronous operations
// https://msdn.microsoft.com/en-us/library/7db28s3c.aspx
var stream = new FileStream(@"d:\temp\input.txt", FileMode.Open, FileAccess.Read,
FileShare.Read, bufferSize, true);  

var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead,
stream.EndRead);

var buffer = new byte[bufferSize];
var oiRead = asyncRead(buffer, 0, bufferSize);
oiRead.Run(bytesRead =>
    Console.WriteLine("The program read {0} bytes asynchronously", bytesRead));

Clearly the beginning of something beautiful :). Although several things don’t look natural yet:

  • The observable contains the amount of bytes read, the data is state maintained outside of the operation (in the buffer).
  • User code will have to deal with partially filled buffers if less bytes were available than the buffer size.
  • The operation only reads once and will need to be repeated if the file is larger than bufferSize or if the stream wasn’t ready to provide all contents of the file

let’s try to tackle the first to points first:

We’ll factor out the code into an extension method on Stream and select in the value of the buffer into the observable:

public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize)
{
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
        stream.BeginRead, stream.EndRead);

    var buffer = new byte[bufferSize];

    return asyncRead(buffer, 0, bufferSize).Select(readBytes =>
{
        var newBuffer = new byte[readBytes];
        Array.Copy(buffer, newBuffer, readBytes);

        return newBuffer;
});
}

This small refactoring has made the usage of the code quite a bit cleaner:

stream.AsyncRead(bufferSize).Run(bytes => DoSomethingWithData(bytes));

Now let’s try to tackle the last issue. We want to keep the signature we created above, however the IObservable<byte[]> should not fire OnNext a single time. It should fire OnNext with as many byte[] instances as it takes to completely read the stream. The reason we want to fire several times, and not once at the end when all data is read in is that we’re potentially dealing with very big files (imagine reading in a 50GB file in 1MB chunks).

One thing that could come to mind is to to try to use the Repeat operator to accomplish this. That will not work unfortunately as the Repeat operator only work on ‘Cold’ observables. If we were to use repeat on the observable returned by asyncRead, it would indefinately return the value produced by the first call to BeginRead.

So what can we do instead? We want to make repeated calls to asyncRead, retrieving new observables and concatenating as long as the amount of bytes read is greater than 0. We also want to make sure only one asyncRead operation is in flight at a time to maintain ordering. This is a pretty imperative operation that we want to do in an asynchronous fashion. In Rx, the preferred way to do these kind of operations is using the Iterate operator.

The Iterate Operator

One of the signatures for the Iterate operator is as follows:

public static IObservable<TResult> Iterate<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)

A pretty complex signature. It takes a function that get’s passed in an IObservable<TResult> as an argument, and returns an IEnumerable that yields values of IObservable<object> and somehow turns it into an IObservable<TResult>

The reason for this signature, is to make Rx play nice with the C# Iterator pattern. It allows users to write asynchronous code, returning IObservables while writing code that looks imperative. If you’re unfamiliar with the C# iterator pattern’s normal use case, I suggest you visit the link above first.

The way the C# Iterator pattern does it’s normal job is by taking the user code that looks imperative and wrap it inside out and build a state-machine. The state-machine keeps track of the control-flow and invokes parts of the user code when it hits the right state. Normally a state transition happens when code calling the C# Iterator pattern needs the next value from the IEnumerable (MoveNext). The state-machine invokes a piece of the user code until the next yield return or break. After which control is returned to the code calling the state-machine.

We can use this for Rx as well. In this case, instead of yield returning a value, we yield return the Observable we’re waiting for to complete. The user code will yield control and the Iterator operator will ensure that the user-code gets control back the moment the Observable is complete. As the return is now taken up by our signaling of Observable completion, we need another way to produce values to the outside world. That’s where the IObserver argument comes in. This is our handle to producing values to the outside world.

So let’s see the Iterate operator in action:

As C# currently doesn’t support the Iterator pattern inside lambdas, we’ll have to call out to a helper method first:

static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize)
{
    return Observable.Iterate<byte[]>(result =>
AsyncReadHelper(result, stream, bufferSize));
}

Next we’ll build the skeleton for our helper:

private static IEnumerable<IObservable<object>> AsyncReadHelper(
    IObserver<byte[]> result, Stream stream, int bufferSize)
{
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead, stream.EndRead);
    var buffer = new byte[bufferSize];

    while (true)
{

}
}

What to put inside the while? In order to match the method’s signature, it’ll have to be something like this:

yield return asyncRead(buffer, 0, bufferSize);

This will yield control back to the Iterator operator until the asyncRead completes after which the next line of the method is called.There are however several problems with that:

  • asyncRead returns IObservable<int>. As int is a value type and .NET variance on generics doesn’t allow casting from a value type to object.
  • we’re not only interested in the completion of the asyncRead, but we' also like to know how many bytes were read so we can size our output array accordingly.

This is where the Start Operator comes in:

public static ListObservable<TSource> Start<TSource>(
    this IObservable<TSource> source)

This operator takes any IObservable<T> and turns it into an object that implements IList<T> and IObservable<object>. As the source observable fires out values on OnNext, it will cache these values and make them available through the IList<T> implementation. Further it will fire OnCompleted on the IObservable<Object> interface once OnCompleted comes in on the source observable.

With the Start operator, we can now finish our AsyncReadHelper implementation: 

private static IEnumerable<IObservable<object>> AsyncReadHelper(
    IObserver<byte[]> result, Stream stream, int bufferSize)
{
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
        stream.BeginRead, stream.EndRead);
    var buffer = new byte[bufferSize];

 

   while (true)
{
        var read = asyncRead(buffer, 0, bufferSize).Start();
        yield return read;

// As asyncRead returns an AsyncSubject,
// and the Iterator operator takes care of exceptions,
// we know at this point there will always be exactly one value in the list.

        var bytesRead = read[0];
        if (bytesRead == 0)
{
// End of file      
yield break;
}

        var outBuffer = new byte[bytesRead];
        Array.Copy(buffer, outBuffer, bytesRead);

        // Fire out to the outer Observable
result.OnNext(outBuffer);
}
}

And with that we have an extension method that will read the whole file asynchronously, exposing the data as an IObservable<byte[]>, opening a world of possibilities of windowing, grouping, and other operations we can do on the data.

Temperature of AsyncRead

Our original version of AsyncRead was hot, our new version is neither hot, nor cold, more like luke-warm. Even though code created using the Iterator operator is generally cold, this is not the case here. As the System.IO.Stream object in .NET maintains state, so does our usage of it. The stream keeps track of the position in the file. If we were to subscribe two observers to this operator, unexpected things will happen. First it depends on if the specific Stream even supports two asynchronous reads at the same time. If it does, it will be a race, some of the data will be on the first observer and some will be on the second observer. Because of this it is best to only attach one observer to AsyncRead. If you really want to attach multiple observers, use the Publish operator on the resulting observable before subscribing.

What’s next?

So far we’ve seen how to use some advanced Rx operators to create an operator that will read in a file asynchronously and create an IObservable<byte[]> to signal the availability of the data. An operation that can be very useful on servers. Imagine an implementation of IHttpAsyncHandler that wants to dynamically read in a file on a slow network share. 

A set of byte[] might not be the most convenient way to look at the contents of a file though. Just like how in the synchronous world one would often prefer a StreamReader with functions like ReadLine over reading a byte[]. Next in this series we’ll talk about creating an asynchronous version of StreamReader functionality.

Comments

  • Anonymous
    July 22, 2010
    Interesting. When I did something similar, I solved the last issue by using Repeat combined with the Defer operator. It appeared to work quite well but perhaps I haven't tested it enough.

  • Anonymous
    July 22, 2010
    Heady stuff! I've got it now, but it took me several readings. It would definitely be worth your while stepping through more slowly  the interaction between the Iterate method and its iteratorMethod - AsyncReadHelper in this case.

  • Anonymous
    July 23, 2010
    Great article. I too was using the Defer + Repeat instead of the iterator pattern to solve this.  Looking forward to the next articles in this series (maybe something on Sockets?)

  • Anonymous
    August 02, 2010
    Hi, Great article. How does one do this using VB10?

  • Anonymous
    January 08, 2012
    Great article Jeffrey. I appreciate that this was written some time ago, but for new readers, this code will not work with the current version of Rx. If you want to find an implementation that does work with the current version of Rx, it looks like the guys on the Rxx project have got it working. Check out http://Rxx.Codeplex.com. As it is an open source project you can see the implementation for yourself too. Lee

  • Anonymous
    February 18, 2015
    Hey Lee, what is the working code? Or a url on rxx to that code? it's a lot of code to wade thru. Thanks!

  • Anonymous
    February 18, 2015
    ok I think I found it. Download the code and go to 1.3SourceRxxSystemIOStreamExtensions.cs

  • Anonymous
    February 18, 2015
    The entry function (extension on Stream) is: public static IObservable<byte[]> ReadToEndObservable(this Stream stream)