Rx on the server, part 3 of n: Writing an observable to a stream

Previously in these series we saw:

Next, I would like us to take a look at writing an observable to a stream asynchronously.

Requirements

When we write an observable to a stream, there are several issues we have to deal with:

1. We want to write all data in the observable to the file.

2. These values should be written to the file in order.

3. We want to be notified once the whole file has been written.

From 1 and 3 we can derive the signature for our method:

 

public static IObservable<Unit> WriteToStream(
this IObservable<byte[]> source, Stream stream)

 

Data Corruption

Point 2 is interesting. Given that we are in an asynchronous world, we have to be very careful of potential data corruption. We want to make sure that if one piece of data is in process of being written to a file, the next piece of data comes in doesn’t corrupt the first write.

 

When doing the research for this post, I initially wrote a Drain operator that would avoid these kinds of issues.

 

I was almost ready to post about this operator, when I found something interesting that was deeply buried inside the MSDN documentation for BeginWrite:

 

The current position in the stream is updated when you issue the asynchronous read or write, not when the I/O operation completes. Multiple simultaneous asynchronous requests render the request completion order uncertain.”

In order words, the position of the stream is updated synchronously in the call to BeginWrite. This means that a chunk in the file of the exact size of your data is reserved for each call to BeginWrite. This will ensure that no data corruptions happen because of simultaneous writes.

 

Because of the following two guarantees Rx gives us:

· The grammar is OnNext* (OnCompleted| OnError)?

· All calls to IObserver are serialized.

We can safely call BeginWrite any time a message comes in.

 

The second sentence talks about the completion order. The uncertain order is not important for us. We are only interesting in knowing that all operations have completed.

 

Out with the Drain operator

Given these new facts, we can throw away the Drain operator. I’ll keep the code around and will blog about in the future. As I’m sure there are scenarios were we will need it.

 

So let’s see how to code up the solution without Drain.

BeginWrite

First we’re going to need an observable version of BeginWrite:

 

var asyncWrite = Observable.FromAsyncPattern<
byte[], int, int>(stream.BeginWrite, stream.EndWrite);

 

Next we want to call asyncWrite for each block of data that comes in to our source observable.

SelectMany

The preferred way to create new asynchronous operations from an observable is to use the SelectMany operator. (SelectMany has other fine uses, but this is a common one).

 

You can skip this section if you’re already familiar with SelectMany.

 

The SelectMany operator takes a selector function argument. This selector function gets called for every value sent through the source observable. The selector should return an observable for each value.

 

The SelectMany operator merges results from all observables produced by the selector function into a single observable that is returned to the user.

 

The observable returned from SelectMany fires OnCompleted once all observables produced by the selector have completed.

 

The returned observable fires OnError in the following cases:

· An error occurred in the source stream.

· An exception was thrown by the selector function.

· An error occurred in any of the produced observables.

WriteToStream Implementation

We’ll use SelectMany in our WriteToStream operator as follows:

 

source.SelectMany(data => asyncWrite(data, 0, data.Length))

We now have an IObservable<Unit> that will fire an OnNext message with Unit for each separate write. After which, the observable will fire OnCompleted to signal that all data has been written.

 

To clean up our operator, we want to get rid of the unnecessary Unit messages coming through. We want to make sure though that we still get notified of either completion or errors that might have happened. We remove all OnNext messages by using the Where operator that filters every single OnNext message:

 

return source.SelectMany(data => asyncWrite(data, 0, data.Length))
.Where(_ => false);

 

The final version of our operator looks like this:

 

public static IObservable<Unit> WriteToStream(
this IObservable<byte[]> source, Stream stream)

{

    var asyncWrite = Observable.FromAsyncPattern<

        byte[], int, int>(stream.BeginWrite, stream.EndWrite);

    return source.SelectMany(data =>
asyncWrite(data, 0, data.Length)).Where(_ => false);

}

 

WARNING: As this operator starts writing any piece of data it receives immediately, this can lead to performance issues when writing many small chunks of data.

Next in these series, we’ll look at how we can create a buffer to only save out bigger chunks of data to avoid this potential performance issue.

Comments

  • Anonymous
    November 29, 2010
    In the above code, when is the Stream closed? Thanks.