Dela via


A foreach statement for IObservable

Asynchronous programming is a very powerful tool. It is key for responsiveness on the client, and scalability on the server (see the C10K problem). However in most modern programming languages, it comes at a pretty high price: when you want to consume the result of an asynchronous operation, you have to provide a callback function to be called upon completion of the operation. The problem is that this introduces an inversion of control, and if you want to use a slightly elaborated control flow around your asynchronous operation, (like if statements, loops, exception handling), it becomes quickly a nightmare, and the code often becomes unreadable and hard to maintain.

People are starting to realize the need for asynchronous programming, but even the most popular frameworks for asynchronous programming, like Node.JS or Tornado with Python are callback based.

This was also the case with C# until recently, but as you know, C# 5.0 introduces the new async/await keywords. Those keywords are the Holy Grail of asynchronous programming. Asynchronous code written using async/await will be as readable as synchronous code, because it no longer requires the use of callbacks.

Async and await can be used out of the box with Task<T>, but any object implementing the awaitable pattern can be used with await. IObservable though, represents a collection, and await only allows to deal with single values. Note that you can await an observable, but that will only yield the last value of that observable. You could also use BufferAllAsync that I showed in my last article, then await the result, but that will require the whole enumeration to be buffered, and you won't be able to do any processing before the observable has completed. What we would need is an async flavor of foreach, something like:

 private static async Task EnumerateAsync(IObservable<string> observable)
{
    async foreach (var item in observable)
    {
        Console.WriteLine(item);
    }
}

Unfortunately, the C# language designers preferred not to overwhelm developers, and did not include that feature in C# 5.0.

Fortunately, it is not hard to recreate. The secret is to transform the IObservable<T> into a IAsyncEnumerable<T>, then use await to enumerate asynchronously over it. IAsyncEnumerable<T> is part of the interactive side of the Reactive Extensions. You will need to add a reference to System.Interactive.Async, which you can find in the Ix_Experimental-Async NuGet package. IAsyncEnumerable<T> is basically a replica of the IEnumerable<T> interface, the only difference is that MoveNext returns Task<bool> on the IAsyncEnumerator instead of bool on the normal IEnumerator. This makes perfect sense, as in the asynchronous case, MoveNext should only complete when the source IObservable has a new value to provide, when the observable completes, or if it throws.

Let’s assume we have an observable that we want to iterate over:

 IObservable<string> io = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v => "Tick " + v);
EnumerateAsync(io);

We can implement an async foreach by copying the expanded form of foreach, and adding the await keyword when calling MoveNext. The result is this:

 private static async Task EnumerateAsync(IObservable<string> observable)
{
    using (var enumerator = observable.ToAsyncEnumerable().GetEnumerator())
    {
        while (await enumerator.MoveNext())
        {
            await Task.Yield();

            var item = enumerator.Current;

            // Your code here
            Console.WriteLine(value);
        }
    }
}

The Task.Yield statement is necessary because of a bug in the implementation of ToAsyncEnumerable. Using that construct, you can process observables in a pull fashion. That means you can wrap the loop in a try/catch statement, use break from the loop, and the disposal of the subscription is automatically handled. That also guarantees that each iteration of the loop will be done one after the other, in a sequential fashion, making your code thread-safe by design.