Udostępnij za pośrednictwem


Observable.Context, Observable.SubscribeOn and Observable.ObserveOn

People have asked about these three API on the Rx forum and to do these APIs justice, I thought I should write up a blog post explaining the reasoning behind these.

A world without Synchronization

Let’s assume Rx didn’t use a SynchronizationContext to post its notifications on and we have two observable streams:

image

In the above sample, the different color and shapes represent the different thread these values are send out on (remember, Rx is a push model, so you’re being called by the sender of the event, on whichever thread the sender chooses).

Now lets imagine that we use the merge operator on these two events:

image

We now get an observable stream, that will call OnNext on different threads, based on what thread the sender of that value sent the value out on.

This can get quite annoying as the method that you’re hooking up to the subscribe of this merged event, will need to be able to accept messages from either thread. This is especially bad if in the handler you want to update the UI (which needs to be done on the UI thread). So in that case you end up having to wrap all your subscribe functions in a call transferring control to the UI thread.

ObserveOn

This problem can be solved by introducing the ObserveOn Operator.

ObserveOn is defined as: IObservable<T> ObserveOn<T>(this IObservable<T> source, SynchronizationContext context).

It will take the source observable and return an observable that for each notification in source, posts a new call on the SynchronizationContext to fire that message on the output observable.

SubscribeOn

Next, lets look at the Concat operator. The concat operator works as follows (I assume no exception messages happen here for simplicity):

Upon a user subscribing to the output stream of Concat, concat will subscribe to the first observable from its input and fires all messages through to the output. The moment the first input fires the OnCompleted message, Concat will dispose that source and subscribe to the next source and continue passing messages from the second source to the output.  It does this till all sources have been iterated.

image

If messages from each of the sources come out on different threads, there is one big problem here: As the subscribe to the next source is triggered by OnCompleted of the previous source, which means that the subscribe is executed on whatever thread OnCompleted fires on.

How can this be a big problem? Well imagine that your second source observable was created by ObservableFromEvent. Now FromEvent is implemented to hooks up to the underlying .NET event the moment you subscribe to the observable. It turns out that Silverlight will throw if you try to hookup an event on a different thread than the UI thread

try writing this code:

image

and you’ll get this error:

image

In WinForms and WPF you won’t get an exception, but a quick look in Reflector will show that there is a raise condition on adding events from multiple threads to WinForms and WPF (as these libraries have manual event add/remove implementations).

This issue could be fixed in several ways:

0: Require all operators to do their subscribe on a safe thread. This is hard to guarantee if anyone can implement IObservable.

1: Require all inputs to Concat to call ObserveOn before being fed into Concat. e.g

var output = Observable.Concat(first.ObserveOn(context), second.ObserveOn(context), third.ObserveOn(context);

2: Introduce a new Operator that will wrap an Observable and will schedule its subscribe on the SynchronizationContext provided:

IObservable<T> SubscribeOn<T>(this IObservable<T> source, SynchronizationContext context);

e.g: var output = Observable.Concat(first, second,third).SubscribeOn(context);

These two operators combined, make context switching a easier, but it still requires a lot of thought and tough debugging if you get it wrong.

Observable.Context

As we found that although ObserveOn and SubscribeOn were very useful in changing SynchronizationContext, but still required a lot of unnecessary thought on the developers behalf, we worked out a global solution:

By default, all operators in the Rx library, will use the SynchronizationContext stored in Observable.Context to send out event values to the observers. This will ensure that by default, the library provides consistent behavior for these operators that all messages come out on the same synchronization context.
Users can still override to have the output go to a different context by using the ObserveOn and SubscribeOn operators.

Now why don’t we use SynchronziationContext.Current? Well the problem is that that is a ThreadLocalStorage property that will give the SynchronizationContext for the current thread and that is not the one we’re looking for, we want the one where the messages need to end up (often the UI Thread).

SynchronizationContext.*

As we found that there is a set of commonly used SynchronizationContexts, we have provided helpers for easy access to these contexts:

  • CurrentDispatcher: wraps the WPF Dispatcher, to post on the WPF UI Thread for use in WPF and Silverlight applications
  • WindowsForms: will use the current Windows Forms form to post back on the Windows Forms UI Thread
  • ReactiveEventLoop: will use a single background thread to post messages in a synchronized matter for applications that don’t have UI
  • FromControl(Control control): will use control to post back on the Windows Forms UI Thread (if WinForms controls are embedded in a native UI where there is no current Form).
  • CreateEventLoop(string name): will create a new background thread to post messages to for the live time of the SynchronizationContext.

Default

As Rx needs Observable.Context to have a value, we have chosen a default for the property:

On .NET 3.5 & 4, the default is set to the ReactiveEventLoop, as we don’t know if the application is a Console app, ASP.NET hosted or has a WinForms/WPF UI.

In your UI applications, you will want to set the right SynchronizationContext for Rx as one of the first things upon launch.

On Silverlight, as there is always a WPF engine present and 99% of the Silverlight applications have a UI, the default is set to SynchronizationContexts.CurrentDispatcher.

Future

As this is a feature we added only recently and has a lot of complexity, expect changes to this pattern in the future.

Please let us know what you like and don’t like about this pattern on the Rx forum 

As it is Thanksgiving here in the US next week, please post on the forum and not in this blog so we can keep track of your questions over the holidays.

Happy Holidays,

 

 

The RxTeam

Comments

  • Anonymous
    October 01, 2010
    In the drop I just pulled down I don't see Observable.Context... is there an equivalent result somewhere?