Share via


Curious about IObservable? Here’s a quick example to get you started!

Have you heard about IObservable/IObserver support in Microsoft StreamInsight 1.1? Then you probably want to try it out. If this is your first incursion into the IObservable/IObserver pattern, this blog post is for you!

StreamInsight 1.1 introduced the ability to use IEnumerable and IObservable objects as event sources and sinks. The IEnumerable case is pretty straightforward, since many data collections are already surfacing as this type. This was already covered by Colin in his blog. Creating your own IObservable event source is a little more involved but no less exciting – here is a primer:

First, let’s look at a very simple Observable data source. All it does is publish an integer in regular time periods to its registered observers. (For more information on IObservable, see https://msdn.microsoft.com/en-us/library/dd990377.aspx ).

  1. sealed class RandomSubject : IObservable<int>, IDisposable
  2. {
  3.     private bool _done;
  4.     private readonly List<IObserver<int>> _observers;
  5.     private readonly Random _random;
  6.     private readonly object _sync;
  7.     private readonly Timer _timer;
  8.     private readonly int _timerPeriod;
  9.  
  10.     /// <summary>
  11.     /// Random observable subject. It produces an integer in regular time periods.
  12.     /// </summary>
  13.     /// <param name="timerPeriod">Timer period (in milliseconds)</param>
  14.     public RandomSubject(int timerPeriod)
  15.     {
  16.         _done = false;
  17.         _observers = new List<IObserver<int>>();
  18.         _random = new Random();
  19.         _sync = new object();
  20.         _timer = new Timer(EmitRandomValue);
  21.         _timerPeriod = timerPeriod;
  22.         Schedule();
  23.     }
  24.  
  25.     public IDisposable Subscribe(IObserver<int> observer)
  26.     {
  27.         lock (_sync)
  28.         {
  29.             _observers.Add(observer);
  30.         }
  31.         return new Subscription(this, observer);
  32.     }
  33.  
  34.     public void OnNext(int value)
  35.     {
  36.         lock (_sync)
  37.         {
  38.             if (!_done)
  39.             {
  40.                 foreach (var observer in _observers)
  41.                 {
  42.                     observer.OnNext(value);
  43.                 }
  44.             }
  45.         }
  46.     }
  47.  
  48.     public void OnError(Exception e)
  49.     {
  50.         lock (_sync)
  51.         {
  52.             foreach (var observer in _observers)
  53.             {
  54.                 observer.OnError(e);
  55.             }
  56.             _done = true;
  57.         }
  58.     }
  59.  
  60.     public void OnCompleted()
  61.     {
  62.         lock (_sync)
  63.         {
  64.             foreach (var observer in _observers)
  65.             {
  66.                 observer.OnCompleted();
  67.             }
  68.             _done = true;
  69.         }
  70.     }
  71.  
  72.     void IDisposable.Dispose()
  73.     {
  74.         _timer.Dispose();
  75.     }
  76.  
  77.     private void Schedule()
  78.     {
  79.         lock (_sync)
  80.         {
  81.             if (!_done)
  82.             {
  83.                 _timer.Change(_timerPeriod, Timeout.Infinite);
  84.             }
  85.         }
  86.     }
  87.  
  88.     private void EmitRandomValue(object _)
  89.     {
  90.         var value = (int)(_random.NextDouble() * 100);
  91.         Console.WriteLine("[Observable]\t" + value);
  92.         OnNext(value);
  93.         Schedule();
  94.     }
  95.  
  96.     private sealed class Subscription : IDisposable
  97.     {
  98.         private readonly RandomSubject _subject;
  99.         private IObserver<int> _observer;
  100.  
  101.         public Subscription(RandomSubject subject, IObserver<int> observer)
  102.         {
  103.             _subject = subject;
  104.             _observer = observer;
  105.         }
  106.  
  107.         public void Dispose()
  108.         {
  109.             IObserver<int> observer = _observer;
  110.             if (null != observer)
  111.             {
  112.                 lock (_subject._sync)
  113.                 {
  114.                     _subject._observers.Remove(observer);
  115.                 }
  116.                 _observer = null;
  117.             }
  118.         }
  119.     }
  120. }

 

So far, so good. Now let’s write a program that consumes data emitted by the observable as a stream of point events in a Streaminsight query. First, let’s define our payload type:

  1. class Payload
  2. {
  3.     public int Value { get; set; }
  4.  
  5.     public override string ToString()
  6.     {
  7.         return "[StreamInsight]\tValue: " + Value.ToString();
  8.     }
  9. }

 

Now, let’s write the program. First, we will instantiate the observable subject. Then we’ll use the ToPointStream() method to consume it as a stream. We can now write any query over the source - here, a simple pass-through query.

  1. class Program
  2. {
  3.     static void Main(string[] args)
  4.     {
  5.         Console.WriteLine("Starting observable source...");
  6.         using (var source = new RandomSubject(500))
  7.         {
  8.             Console.WriteLine("Started observable source.");
  9.             using (var server = Server.Create("Default"))
  10.             {
  11.                 var application = server.CreateApplication("My Application");
  12.  
  13.                 var stream = source.ToPointStream(application,
  14.                     e => PointEvent.CreateInsert(DateTime.Now, new Payload { Value = e }),
  15.                     AdvanceTimeSettings.StrictlyIncreasingStartTime,
  16.                     "Observable Stream");
  17.  
  18.                 var query = from e in stream
  19.                             select e;
  20.  
  21.                 [...]

 

We’re done with consuming input and querying it! But you probably want to see the output of the query. Did you know you can turn a query into an observable subject as well? Let’s do precisely that, and exploit the Reactive Extensions for .NET (https://msdn.microsoft.com/en-us/devlabs/ee794896.aspx) to quickly visualize the output. Notice we’re subscribing “Console.WriteLine()” to the query, a pattern you may find useful for quick debugging of your queries. Reminder: you’ll need to install the Reactive Extensions for .NET (Rx for .NET Framework 4.0), and reference System.CoreEx and System.Reactive in your project.

  1.                 [...]
  2.  
  3.                 Console.ReadLine();
  4.                 Console.WriteLine("Starting query...");
  5.                 using (query.ToObservable().Subscribe(Console.WriteLine))
  6.                 {
  7.                     Console.WriteLine("Started query.");
  8.                     Console.ReadLine();
  9.                     Console.WriteLine("Stopping query...");
  10.                 }
  11.                 Console.WriteLine("Stopped query.");
  12.             }
  13.             Console.ReadLine();
  14.             Console.WriteLine("Stopping observable source...");
  15.             source.OnCompleted();
  16.         }
  17.         Console.WriteLine("Stopped observable source.");
  18.     }
  19. }

 

We hope this blog post gets you started. And for bonus points, you can go ahead and rewrite the observable source (the RandomSubject class) using the Reactive Extensions for .NET! The entire sample project is attached to this article.

Happy querying! Smile

Regards,
The StreamInsight Team

ObservableExample.zip