Rx on the server, part 4 of n: Buffering output to a stream
Previously in these series we saw:
- Reading a stream asynchronously
- Reading strings asynchronously, line by line
- Writing an observable to a stream
The last post finished with a warning regarding sending to many small chunks of data to the stream asynchronously.
In this post we’ll take a look at avoiding that problem by buffering the stream until a minimum size has been reached.
Signature & Pattern
As our WriteToStream method expects a IObservable<byte[]> as its input, our transformation is going to go from IObservable<byte[]> to IObservable<byte[]>:
public static IObservable<byte[]> MinimumBuffer(this IObservable<byte[]> source,
int bufferSize)
We’re going to be working on an existing observable and produce a new observable out of that. So this is a good fit for the Observable.CreateWithDisposable pattern:
return Observable.CreateWithDisposable<byte[]>(observer =>
{
…
}
Requirements
Let’s look at the task at hand, there are a couple of things we need to do:
- Subscribe to the original observable
- Handle OnNext messages
- Add the message to a buffer
- Fire out a new array based on the content of the buffer when buffer becomes bigger than bufferSize
- Clear the buffer
- Send through OnError messages immediately
- Complete the stream, sending out any remaining data in the buffer on an OnCompleted message.
- Return a disposable object that will unsubscribe the original subscription.
Implementation
As we need a buffer that can be accessed through out the subscription, we begin our observable.CreateWithDisposable body with allocating a list to hold that buffer:
var data = new List<byte>();
Next, we’re going to subscribe to the original stream. As that is the only subscription we’re going to manage, we’ll return the disposable object from that subscription as the object maintaining the subscription to our output stream:
return source.Subscribe(value =>
{
…
},
error=>
{
…
},
() =>
{
…
});
Now it is time to fill in the details of the subscription.
First we’ll tackle the OnNext messages:
data.AddRange(value);
if (data.Count > bufferSize)
{
observer.OnNext(data.ToArray());
data.Clear();
}
OnError will be straightforward: As we want to maintain abort semantics, we’ll send through any exception from the source stream immediately:
observer.OnError(error);
This statement can be reduced to a lambda expression and even further to a method group, which we’ll see in the final code.
Once we get an OnCompleted message, we could be in the situation that the buffer still contains data, but hasn’t reached the boundary of bufferSize.
In that case, we should empty the buffer before sending out OnCompleted to our subscribers:
if (data.Count > 0)
observer.OnNext(data.ToArray());
observer.OnCompleted();
Final Code
And with that, we have the full implementation of our buffering operator:
public static IObservable<byte[]> MinimumBuffer(this IObservable<byte[]> source,
int bufferSize)
{
return Observable.CreateWithDisposable<byte[]>(observer =>
{
var data = new List<byte>();
return source.Subscribe(value =>
{
data.AddRange(value);
if (data.Count > bufferSize)
{
observer.OnNext(data.ToArray());
data.Clear();
}
},
observer.OnError,
() =>
{
if (data.Count > 0)
observer.OnNext(data.ToArray());
observer.OnCompleted();
});
});
}
We can now use this operator to avoid performance issues when writing out our observable to a stream:
source.MinimumBuffer(2 << 16).WriteToStream(stream);
What’s next?
We’ve spent quite some time so far in these series on talking about Streams.
Next we’ll take a look at how we could use these stream operations from ASP.NET.
We’ll try to plug Rx into the IHttpAsyncHandler mechanism and use our stream operators from within that pattern.