Rx on the server, part 2 of n: Asynchronous StreamReader

In part 1 of these series, we talked about how to read a stream asynchronously. This resulted in an operator returning IObservable<byte[]>. This type can be useful in many scenarios, e.g. you’re processing a large image. However there are plenty of scenarios you want to retrieve the content of the file in a more descriptive way. E.g. imagine parsing a huge CSV file. In that case we’d like to see a type like IObservable<string> where each OnNext call represents a whole line in the file. In this part of the series, we’ll look into how we can create an operator that fits this signature.

StreamReader

The .NET BCL provides a synchronous way of writing the pattern we described above through the StreamReader class, unfortunately this class doesn’t provide asynchronous versions of methods like ReadLine. So we’ll have to cook up something ourselves.

That doesn’t mean of course, we can’t learn from what the implementation of StreamReader. So we’re off to the reference source to take a look at DEVDIV_TFS\Dev10\Releases\RTMRel\ndp\clr\src\BCL\System\IO\StreamReader.cs\1305376\StreamReader.cs. Here we find a couple of interesting things about StreamReader/ReadLine:

 

  • It uses a buffer that reads from the underlying stream
  • It uses the Encoding class to go from byte[] to strings
  • As encoding can span multiple buffer reads, a lot of logic happens here to avoid issues around that.
  • When ReadLine is called, it walks through this buffer to find the next new line (\r, \r\n or \n). Whenever it finds that or finds the end of the file, it builds a string and returns the value.

 

Let’s try to convert this to the async pattern. We’re going to ignore the encoding part for now as that would clutter the sample and we’re going to assume ASCII encoding.

 

  • We’ll use the AsyncRead method from our previous sample to push asynchronous reads of byte[] to us.
  • We’ll return an IObservable<string> that will be pushed lines in the file.
  • We’ll build a buffer of characters until we hit a new line or the end of the file
  • Once we hit that condition, we fire out a call to OnNext to push out the line we build up. After which we’ll clear the buffer.

 

So let’s get started. First we define the signature of this method:

 

public static IObservable<string> AsyncReadLines(
this Stream stream, int bufferSize)

 

Next we’re going to write a buffering IObservable that works based on the original IObservable from AsyncRead. Often, these kind of operations could be done by doing a transformation on the AsyncRead observable e.g. with the Scan operator. However, as we also want to do some work when the source observable completes (at the end of the file) it is best to write a custom implementation using Observable.CreateWithDisposable.

Observable.CreateWithDisposable & Observable.Create

These methods are convenient helpers to allow you to write concrete implementations of IObservable without having to deal with a lot of the clutter. The signature of Observable.CreateWithDisposable is the following:

 

public static IObservable<TSource> CreateWithDisposable<TSource>(
Func<IObserver<TSource>, IDisposable> subscribe)

 

Now, if you look at the IObservable interface definition:

 

public interface IObservable<TSource>

{

    IDisposable Subscribe(IObserver<TSource> observer);

}

 

You’ll see that the signature of the Func<> passed into CreateWithDisposable looks a lot alike the signature of the Subscribe method of the IObservable interface. Well that’s completely correct. Observable.CreateWithDisposable is a trick that we’re using in Rx to work around the fact that C# doesn’t support anonymous inner-types. In languages that support anonymous inner types, you would write something like this:

 

public static IObservable<string> AsyncReadLines(
this Stream stream, int bufferSize)

{

    return new IObservable<string>

    {

        public IDisposable Subscribe(IObserver<string> observer)

        {

            ...

        }

    }

}

 

Working around the lack of anonymous inner-types, we can now write something like this:

 

public static IObservable<string> AsyncReadLines(
this Stream stream, int bufferSize)

{

    return Observable.CreateWithDisposable<string>(observer =>

    {

        ...

    }

}

 

Next to providing a convenient way of writing Observable implementations inline, the Observable.CreateWithDisposable operator provides another handy feature that otherwise would require a lot of plumbing: It wraps any observer passed into Subscribe in an observer that does two nifty things:

· When the Observable stream is done (either by firing OnError or Oncompleted, it automatically disposes the subscription to the Observable, cleaning up any state as soon as possible.

· It ensures that once OnError or OnCompleted is called once, no more messages are sent through to any of the OnNext, OnError or OnCompleted messages, to ensure the Rx contract of OnNext* (OnError|OnCompleted)?

 

Observable.Create is almost the same as Observable.CreateWithDisposable. The only difference is that the subscribe Func<> returns an Action instead of an IDisposable. The idea is the same here, you don’t have to create an implementation of IDisposable, but use an action as your anonymous inner type version of IDisposable.Dispose.

 

AsyncReadLines Implementation

Ok, let’s get back to the problem at hand. We’ve got the wireframe for our method; now let’s look at the details.

First we’re going to start with calling our AsyncRead method from the previous blog entry:

 

var blocks = AsyncRead(stream, bufferSize);

Now this still works on byte[] and we said we wanted to work in ASCII, so let’s convert that:

 

var blocks = AsyncRead(stream, bufferSize)
.Select(block=>Encoding.ASCII.GetString(block));

 

Next we want to do something with the data inside the source IObservable. The only way to get to that data is to subscribe to the stream:

 

var subscription = blocks.Subscribe(data =>

    {

        ...

    },

    exception =>

    {

        ...

    },

    () =>

    {

        ...

    });

 

We’ll also have to make a decision on how the user can unsubscribe the observer they passed to our observable. In this case, when the user unsubscribes from our observable, we need to unsubscribe from the subscription we made ourselves, so we’ll modify the code to return the subscription we received ourselves:

 

return blocks.Subscribe(data =>

    {

        ...

    },

    exception =>

    {

        ...

    },

    () =>

    {

        ...

    });

 

Now that we’ve hooked up to the source observable, we get a call every time new data is available. This data is presented as a string that potentially contains several lines. So let’s write some code to parse that string. We’re going to walk through the string and look for \r, \r\n or \n tokens. If we find one of those were at the end of a line and we’ll have to take some action. If we’re not at the end of a line, we put the character in a temporary buffer (a StringBuilder).

 

var sb = new StringBuilder();

for (var i = 0; i < data.Length; i++)

{

    var atEndofLine = false;

    var c = data[i];

    if (c == '\r')

    {

        atEndofLine = true;

        var j = i + 1;

        if (j < data.Length && data[j] == '\n')

            i++;

    }

    else if (c == '\n')

    {

        atEndofLine = true;

    }

    if (atEndofLine)

    {

        produceCurrentLine();

    }

    else

    {

        sb.Append(c);

    }

}

 

Before we look at what we want to do when we see an end of line (the produceCurrentLine call), let’s take a look at what we should do in case of OnError or OnComplete coming through on the source observable.

As Rx likes to maintain abort semantics for exceptions, we’re going to pass any exception that comes in on the source IObservable through right away, so we’ll make any call to OnError pass through to the subscribing observer:

 

exception =>

{

    observer.OnError(exception);

}

This can be simplified by converting the statement lambda into a method group:

 

observer.OnError

 

For OnCompleted we’ll have to do some more work. Imagine a file that doesn’t have a newline at the end of the file, we still like to send out this string. So when our source signals OnCompleted, we’ll have to do the same work as when we hit a newline character inside our OnNext. So we’re going to share that code. After we’ve produced the current line, we fire OnCompleted.

 

() =>

{

    produceCurrentLine();

    observer.OnCompleted();

});

 

Now let’s implement produceCurrentLine. We need to get the string out of the buffer, reinitialize the buffer and send the string to onNext:

 

Action produceCurrentLine = () =>

{

    var text = sb.ToString();

    sb.Length = 0;

    observer.OnNext(text);

};

And with that we have the full implementation of our AsyncReadLines operator:

 

public static IObservable<string> AsyncReadLines(
this Stream stream, int bufferSize)

{

    return Observable.CreateWithDisposable<string>(observer =>

    {

        var sb = new StringBuilder();

        var blocks = AsyncRead(stream, bufferSize).Select(
block => Encoding.ASCII.GetString(block));

        Action produceCurrentLine = () =>

        {

            var text = sb.ToString();

            sb.Length = 0;

            observer.OnNext(text);

        };

        return blocks.Subscribe(data =>

        {

            for (var i = 0; i < data.Length; i++)

            {

                var atEndofLine = false;

                var c = data[i];

                if (c == '\r')

                {

                    atEndofLine = true;

                    var j = i + 1;

                    if (j < data.Length && data[j] == '\n')

                        i++;

                }

                else if (c == '\n')

                {

                    atEndofLine = true;

                }

                if (atEndofLine)

                {

                    produceCurrentLine();

                }

               else

                {

                    sb.Append(c);

                }

            }

        },

        observer.OnError,

        () =>

        {

            produceCurrentLine();

            observer.OnCompleted();

        });

    });

}

 

The following program shows how to use this operator:

 

// bufferSize > 64k as per BeginRead spec: https://msdn.microsoft.com/en-us/library/zxt5ahzw.aspx

var bufferSize = 2 << 16;

// constructor of FileStream that enables asynchronous operations

var stream = new FileStream(@"d:\temp\input.txt", FileMode.Open, FileAccess.Read, FileShare.Read, bufferSize, true);

stream.AsyncReadLines(bufferSize).Run(Console.WriteLine);

 

What’s next?

In these series, we’ve seen how to asynchronously read from a file. In this post, we learned how to create an operator that buffers data and sends out this buffer in useful chunks. Next we’ll tackle writing to a file asynchronously and see what kind of specific challenges that operation brings.

Comments

  • Anonymous
    August 01, 2010
    Great post! I have one question: what if "data => ..." lambda expression is called concurrently ?

  • Anonymous
    August 02, 2010
    The Rx contract requires IObservable implementations to fire messages in a serialized fashion. When encountering an IObservable implementation that does not follow this rule, the .Synchronize operator will fix the bad behaving object.

  • Anonymous
    December 10, 2010
    The comment has been removed

  • Anonymous
    December 10, 2010
    Jeffrey, Although this is a small detail, I noticed that either the  MSDN documentation is incorrect or the BeginRead() operator no longer requires the buffer size to be greater than 64 KB in order to be asynchronous as your comments and URL indicate.   I reviewed the MSDN documentation for the BeginRead() spec and noticed that the 64 KB buffer size requirement is needed for BeginRead() to be asynchronous in .NET versions 1.1, 2.0, and 3.0, but not 3.5, nor 4.0. Scott

  • Anonymous
    December 27, 2010
    warning: this doesn't work for UTF8 since multi-byte characters can span the blocks returned from the underlying stream. the GetString call doesn't correctly handle this case.