Freigeben über


A Brief Introduction to the Reactive Extensions for .NET, Rx

Wow, it has been a long time since I have blogged.  Yesterday, we made the first official public release of Rx on devlabs.  And that means that I can now talk about what has been on my mind for the past while: Rx.  What is Rx?  Rx is short for the Reactive Extensions for .NET.  Think of it as a prescription strength library for writing reactive programs in your favorite .NET language.

Today, I want to give you a brief introduction to Rx that complements my screencast.  In subsequent posts, I will dive deep into the semantics of Rx providing detailed explanations and examples.  Hopefully, this will positively impact your day to day work and allow you to have a lot of fun in the process.  Please feel free to ask any questions that you may have or provide feedback about Rx.  This first release is a sort of preview of the technology.  We will provide regular updates that both extend and refine the library and we want you to be a part of that process.

First Steps

Rx is currently available for .NET Framework 3.5 SP1, .NET Framework 4 Beta 2, and Silverlight 3.  Download and install one of these versions and then open Visual Studio.

image

Create a WPF Application or a Silverlight Application.

image

image

The Chrome

When the designer opens, replace the existing Grid control with the following XAML.

<Canvas>
<TextBlock
x:Name="textBlock"
Text="Rx Rocks!" />
<Image
x:Name="image"
Source="https://upload.wikimedia.org/wikipedia/commons/7/7a/Rx_symbol.png"
        Width="100"
Height="100"
Canvas.Left="0"
Canvas.Top="25" />
</Canvas>

Then switch to the code view.

image

Starting with Rx

To begin using Rx, first add a reference to System.Reactive and System.CoreEx.  The two other assemblies included with this release are System.Threading (task parallel library) and System.Interactive (more enumerable goodness).

image

image

image 

Once you have added a reference to System.Reactive and System.CoreEx, then you can use the reactive operators found in Observable.

Observable Sequences

Observables, like enumerables, are sequences.  The only difference is that the latter is pull-based and the former is push-based.  Both sequences are a possibly empty sequence of values optionally followed by either normal termination or abnormal termination.

When you want to enumerate an enumerable:

1.  Get the enumerator from the enumerable

2.  Call MoveNext and call Current on the enumerator for each value in the sequence

3.  The sequence ends when MoveNext returns false (normal termination) or MoveNext or Current throws an exception (abnormal termination)

The user writes a foreach statement over an enumerable sequence.

foreach (var x in xs)
F(x);

With enumerables, each value is pulled out of the source and bound to the iteration variable before executing the user code.  Note that the consumer controls this process.

When you want to observe an observable sequence:

1.  Subscribe an observer to the observable

2.  OnNext is called on the observer for each value in the sequence

3.  The sequence ends when OnCompleted (normal termination) or OnError (abnormal termination) is called on the observer

The user writes a subscribe statement over an observable sequence.

xs.Subscribe(x => F(x));

With observables, each value is pushed out of the source and bound to the handler variable before executing the user code.  Note that the producer controls this process.

Hello, World!

Let’s write our first Rx application, the obligatory “Hello, World!” application.

Remember all those times that you wrote UI applications with asynchronous operations and got the dreaded Cross-Thread operation exception.

image

Well Rx, makes this simpler despite the fact that it has a lot of asynchronous and event-based operations going on at once.

The user may specify the default SynchronizationContext to which to send Rx messages.  By default, it uses a reactive event loop except in Silverlight where it uses the dispatcher by default.  The user can change this by setting Observable.Context to the appropriate SynchronizationContext.  Rx provides a few commonly used synchronization contexts in the class SynchronizationContexts.

Since we are using WPF, we should set the default context to the dispatcher context.

Observable.Context = SynchronizationContexts.CurrentDispatcher;

Now, we can use the UI without getting cross-thread exceptions.

Rx includes a number of basic operations for constructing observable sequences.  For example, Return creates a singleton sequence that contains an OnNext message with the specified value followed by an OnCompleted message.  We can then subscribe to the observable using a value handler which specifies what to do with each OnNext message from the observable sequence.

var helloWorld = Observable.Return("Hello, World!");

helloWorld.Subscribe(value => textBlock.Text = value);

When we run the application, we see that the text is set to “Hello, World!”.

image

A Second Step

Another basic operation for constructing observable sequences, is the Interval operator.  It creates an observable sequence that repeatedly sends an OnNext message after the specified time interval.  For example, we can create an observable sequence that sends a message after every second.

var timer = Observable.Interval(TimeSpan.FromSeconds(1));

timer.Subscribe(value => textBlock.Text = value.ToString());

image 

Wrapping .NET Events

Another source of observable sequences is existing .NET events and asynchronous computations.  We can bring these existing reactive sources into Rx by using conversions.  For example, we can convert .NET events to observable sequences using the FromEvent operator.  The user must specify the type of EventArgs for the event, the target, and the event name.  We can use this to convert the MouseDown event on the image to an observable sequence of events

var mouseDown = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown");

LINQ to Rx

Observable contains an implementation of the standard query operators for observable sequences and that means that we can use LINQ to write queries over observable sequences.  For example, we can modify our mouseDown observable sequence to contain a sequence of position values by projecting each event to the current mouse position relative to the image.

var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
select evt.EventArgs.GetPosition(image);

Composition

Rx also contains a number of operators to compose together observable sequences in a way very similar to how Linq to Objects composes together enumerable sequences. For example, the operator Until takes all of the messages from the left source until an OnNext message occurs on the right source at which time it will unsubscribe from the left source and the right source.

var q = timer.Until(mouseDown);

q.Subscribe(value => textBlock.Text = value.ToString());

Drag and Drop

We can put together these simple pieces and build drag and drop.  In the screencast, I wrote drag and drop using the mouse deltas, here I will write it using the mouse offset to show another way to do the same thing.  As you begin to use Rx, you will find that often you can simplify your queries repeatedly and if you do it right then at the end you will get a query that looks remarkably like the specification of the problem.

What is the specification of drag and drop?  Every time the user clicks down on the image, we want to move the image relative to where she clicked down until the mouse button goes up.

Therefore, we need three events: the mouse down on the image, the mouse move on the window, and the mouse up on the window.  For the mouse down, we need the position relative to the image, for the mouse move, we need the position relative to the window, and for the mouse up we do not need the position.

var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
select evt.EventArgs.GetPosition(image);

var mouseUp = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp");

var mouseMove = from evt in Observable.FromEvent<MouseEventArgs>(image, "MouseMove")
select evt.EventArgs.GetPosition(this);

Now, we can write a query that expresses our intent.  When the mouse goes down grab the offset of the mouse relative to the image, and then listen to the mouse moves relative to the window until the mouse goes up.  For each mouse move during this time, compute the image position as the mouse position subtracted by the original offset relative to the image.

var q = from imageOffset in mouseDown
from pos in mouseMove.Until(mouseUp)
select new { X = pos.X - imageOffset.X, Y = pos.Y - imageOffset.Y };

Finally, we can subscribe to the observable sequence of positions and update the position of the image.

q.Subscribe(value =>
{
Canvas.SetLeft(image, value.X);
Canvas.SetTop(image, value.Y);
});

image 

Looking Forward

Drag and drop is fun, but it is only one small example of the power of Rx.  Next time, we will start our deep dive into the semantics of Rx.  Until then take the time to download it and take it for a test drive.

Comments

  • Anonymous
    November 18, 2009
    Glad to see the blog revived - I've been lurking here hoping for that for a looong time!

  • Anonymous
    November 18, 2009
    Does all this mean that the event-based async pattern will get deprecated, and the Begin/End IAsyncResult pattern make a comeback? The event-based one is very hard to compose and generally a PITA to work with. I recall completely abandoning the WebServices support in VS2005 when the async model for generated interfaces changed between the beta (or was it a CTP) and the final release.

  • Anonymous
    November 18, 2009
    @Barry Kelly No, the event-based async pattern and the Begin/End IAsyncResult patterns are not deprecated; however, you can chose to write Rx on top of either model if that makes it more palatable.

  • Anonymous
    November 18, 2009
    Good to see you are back again with your blog! I wondered a month ago whether to remove your blog from my google reader or not. I'm glad i didnt! Hope your back for good, or at least a long time :)

  • Anonymous
    November 18, 2009
    I have heard mixed reports about whether Rx is going to be included in .NET 4.0 itself - does the DevLabs release indicate that it won't be in .NET 4.0?

  • Anonymous
    November 18, 2009
    Hi Wes, awesome job on Rx, it's a real work of art. Is there any reason why all the assemblies in this drop are not CLSCompliant?

  • Anonymous
    November 18, 2009
    Hey, I'm glad to see you blogging again. Keep it up! I always like what you write.

  • Anonymous
    November 18, 2009
    Hooray! The talented Wes Dyer is blogging once again! Great to have you blogging again, Wes. I hope to see more posts on Rx coming from you.

  • Anonymous
    November 19, 2009
    @jonskeet Unfortunately, the only things from Rx making it into .NET 4.0 are the two intefaces: IObservable and IObserver. @Jacob Stanley No reason.  Noted as a bug, we'll fix it in a minor release.

  • Anonymous
    November 19, 2009
    Hey Wes, great to see you blogging again!! Terrific job on Rx!

  • Anonymous
    November 19, 2009
    @Wes: That's no problem - I just want to get my facts straight as I'm hoping to write about Rx in the second edition of C# in Depth as an example of an alternative in-process LINQ provider. (I was going to write about my own hobby version - Push LINQ - but obviously Rx is a better choice.) Please ping me if you'd be interested in tech-proofing the Rx section of the book. I should be writing that section in a few weeks.

  • Anonymous
    November 19, 2009
    Wes, thank you for this article.

  • Anonymous
    November 22, 2009
    Enjoyed the Rx talk at PDC.  I'd love to download Rx for .NET 4 Beta 2, but the download link appears to be broken (same on the DevLabs site).  The link for the .NET 3.5 SP1 version works, though.

  • Anonymous
    November 24, 2009
    This Interval Sample does not seem to work in Beta 2.  Has anyone else had success with this?  The Lambda fires every second, but the TextBlock never updates. I am having lots of trouble getting any of this to work, can we have a sample project with these basic functions in it?  Maybe I am missing a reference somewhere, but I followed the above instructions explicitly and q.Subscribe(value => textBlock.Text = value.ToString()); never updates the TextBlock Thanks, when I figure out how this works, it is going to help trmendously with what we are doing with pushing sensor information to the UI

  • Anonymous
    November 24, 2009
    Actually this appears to be a problem with the VB.Net implementation: In C# this works fine, everything updates        private void Window_Loaded(object sender, RoutedEventArgs e)        {            Observable.Context = System.Threading.SynchronizationContexts.CurrentDispatcher;            var timer = Observable.Interval(TimeSpan.FromSeconds(1));            timer.Subscribe(value => MyTextBlock.Text = value.ToString());        } However, the same thing in VB does NOT:    Private Sub MainWindow_Loaded(ByVal sender As Object, ByVal e As System.Windows.RoutedEventArgs) Handles Me.Loaded        Observable.Context = System.Threading.SynchronizationContexts.CurrentDispatcher        Dim timer = Observable.Interval(TimeSpan.FromSeconds(1))        timer.Subscribe(Function(value) MyTextBlock.Text = value.ToString())    End Sub Is this a problem with Lambdas in VB?  Or is it something in Reactive?

  • Anonymous
    November 24, 2009
    VB works fine... My translation skills didn't :-( Changing timer.Subscribe(Function(value) MyTextBlock.Text = value.ToString()) to timer.Subscribe(Sub(value) MyTextBlock.Text = value.ToString()) made everything work perfectly.

  • Anonymous
    November 24, 2009
    It looks like you figured it out.  If you use Function instead of Sub in VB.NET then "=" means relational equality not assignment.

  • Anonymous
    December 14, 2009
    I'm very new to Rx, so this might be a stupid question. Does Rx provide a way to transform regular .NET events (which work in a push-mode) into a pull-mode? In other words, I see that you can get an IObservable from an event, but I don't see if you can get an IEnumerable from an event. Does Rx help with that?

  • Anonymous
    December 14, 2009
    That is a great question. Rx does support a way to transform .NET events into a pull mode.  This is done in two phases:

  1.  Convert .NET event to Observable
  2.  Convert Observable to Enumerable Typically, you can use either FromEvent or Create to do (1) and you can use ToEnumerable, Latest, Next, or MostRecent for (2) depending on which semantics you want for the conversion.
  • Anonymous
    December 14, 2009
    Thanks for the quick answer. I've another question. About the IObservable<T> example shown at MSDN (http://msdn.microsoft.com/en-us/library/dd990377%28VS.100%29.aspx). The Subscribe function should return an IDisposable object that, when disposed, will remove the observer from the observable. I don't believe the implementation on MSDN is compliant to this. It returns observer as IDisposable, i.e., it will either return (A) null, or (B) the observer parameter. (A) might be acceptable. Returning a null from Subscriber is the same as telling the user code that there isn't any "unsubcriber" object available; (B) won't do it, because the observer has no knowledge of the objects being observed. It can also cause a defect when one wants to remove a subscription and ends up disposing an observer object; BTW, the MSDN documentation doesn't specify anywhere what the Subscriber method is supposed to return. I only knew that after I watched your video with Kim Hamilton.

  • Anonymous
    December 14, 2009
    One correction. At the documentation of the Subscriber method (http://msdn.microsoft.com/en-us/library/dd782981%28VS.100%29.aspx), it's said what the method is supposed to return. So, this is not missing. Anyways, I think it should be cited in the IObservable<T> doc, and the example there should be fixed.

  • Anonymous
    December 23, 2009
    Please could you update the code so it works with the latest version of Rx? For example, Observable.Context = SynchronizationContexts.CurrentDispatcher; is no longer used.

  • Anonymous
    January 23, 2010
    Ditto:: Please could you update the code so it works with the latest version of Rx? For example, Observable.Context = SynchronizationContexts.CurrentDispatcher; is no longer used.

  • Anonymous
    February 17, 2010
    The comment has been removed

  • Anonymous
    February 17, 2010
    The comment has been removed

  • Anonymous
    May 11, 2010
    As Howard pointed out, the Observable.Context doesn't appear to be valid any more. You can get around that by using: timer = Observable.Interval(TimeSpan.FromSeconds(1)).ObserveOnDispatcher I don't see a global context in the current bits. Has it moved somewhere or do we need to handle the dispatcher on each subscription? Steel, the answer to your issue is that by using Function rather than Sub, the operation was doing a comparison and returning the boolean result rather than doing the intended assignment. This is a downside of not differentiating between = (assignment) and == (comparison) in VB.

  • Anonymous
    June 13, 2010
    Unfortunately  this is not an example of useful code. Drag and Drop operations need to capture the mouse. This example does not do this. Just release the mouse while you are off the application window and you'll find you are stuck in drag mode.

  • Anonymous
    November 14, 2010
    The comment has been removed