Поделиться через


How to express common windowing patterns using Rx

When processing continuous streams, one is usually interested in continuous answers broken into epochs or periods of time. Usually, the computations one formulates feel more like "compute the average speed per road segment every 5 minutes" than "what is the average speed in this road segment given all the data I've seen?" If you've been doing stream processing for a while, this is not news to you. In fact, you're probably thinking of windowed queries.

Earlier work on stream engines exposed me to a very practical definition of a window, in which the function that partitions the underlying stream is parameterized by two attributes: range and slide. Range defines what's in a window, and slide defines when new windows start*. In my experience, people often talk about three types of windows:

  1. Tumbling window (range = slide)
  2. Sliding window (range > slide)
  3. Hopping window (range < slide)

Additionally, people also distinguish whether windows are defined in terms of counts or in terms of time (Count vs. Temporal window). I'll try to illustrate these types of windows with a picture in Figure 1.

Figure 1. Windowing examples.

Let the x axis represent time. In Figure 1 (a), you'll see tumbling count-windows with range = slide = 2. Notice a window opens and closes based on event occurrence. For the stream illustrated here, we get three windows, each with two elements in it. Now assume we define windowing in terms of seconds, with slide = range = 2 seconds. In Figure 1 (b), you see tumbling temporal-windows. Notice windows open and close based on time instead of occurrence. Figure 1 (c) illustrates sliding count-windows with range =2 and slide =1, and Figure 1(d) shows hopping count-windows with range = 1 and slide = 2. (Note: The Rx team likes to visualize operations using marble diagrams, which I don't use here, but may use later.)

Enough said! Let's look at some Rx code and Count windows. I realize I'm repeating content that exists in many other sources, but if you think of windows the way I do, this may help you get started expressing them using Rx. Consider the observable sequence xs in Code Fragment 1. The Windowoperator in Rx is one way we can implement both count-based and temporal-based windows of the three types I described. One of its overloads takes two parameters: count (range) and skip (slide), which we apply in the expression q in Code Fragment 1.

            var xs = new [] {0,0,1,2,3,2,2,3,4,5,6}.ToObservable();

            var q = from w in xs.Window(count:2, skip:2)
select w;

Code Fragment 1. Windowing an observable sequence.

The expression q results in a stream of type IObservable<IObservable<int>>, which is a stream of streams -- therefore every window itself is an observable stream to which I can subscribe. OK, I can't resist the temptation. Figure 2 shows the windowing in a marble diagram.

Figure 2. Marble diagram for the expression q.

Since each window is an observable, I can subscribe to them. In Code Fragment 2, I show how we can inspect each window and its contents and confirm this is a tumbling window by subscribing to each window. Output 1 shows the console output.

            int counter = 1;

            q.Subscribe(w =>
            {
                int i = counter++;
                w.Subscribe(
                    x => Console.WriteLine("Window: {0}; Value: {1}", i,x)
                    );
            });  

Code Fragment 2. Subscribing to each window.

            Window: 1; Value: 0
            Window: 1; Value: 0
            Window: 2; Value: 1
            Window: 2; Value: 2
            Window: 3; Value: 3
            Window: 3; Value: 2
            Window: 4; Value: 2
            Window: 4; Value: 3
            Window: 5; Value: 4
            Window: 5; Value: 5
            Window: 6; Value: 6

Output 1. Console output for the tumbling count-window.

 

Usually we want to compute something over the contents of each window, such as an average. You can apply an aggregate function to each window and obtain a stream of results back, as shown in Code Fragment 3. The expression q2 computes the average of every window and results in a stream of type IObservable<double>. As before, we can inspect the stream by subscribing to it and printing out to console (see Output 2).

             var q2 = from w in xs.Window(2,2)
                      from x in w.Average()
                      select x;

                         q2.Subscribe(w =>
                 Console.WriteLine("Average: {0}", w)
                 );

Code Fragment 3. Windowed Average.

                        Average: 0
            Average: 1.5
            Average: 2.5
            Average: 2.5
            Average: 4.5
            Average: 6

Output 2. Windowed average over xs.

 

Temporal windows you ask? No problem. Window has an overload that takes two TimeSpan objects, timeSpan (range) and timeShift (slide). Moreover, if you find the idea of a stream of streams to be a little daunting and you just want a stream of collections (with each collection, List<T>, representing a window), you can look at the Buffer operator.

I keep encountering these simple patterns over and over again. Hopefully this taxonomy (Tumbling, Sliding, or Hopping on one dimension; Count, and Temporal on another) is useful to you. And as your stream processing needs grow, bear in mind Window and Buffer do many more wonderful things, such as using one stream to signal closing a window on another **.

---------------------------

 * Two nice papers that discuss various Windowing semantics:
- Li et al. Semantics and evaluation techniques for window aggregates in data streams. SIGMOD 05.
- Botan et al. SECRET: a model for analysis of the execution semantics of stream processing systems. VLDB 2010.

** Additional great reads on Window, Buffer, and Rx:
- Window and Buffer on MSDN.
- Campbell, Lee. Introduction to Rx. (Online book)

 

Thanks to Mark Simms, Christian Martinez, and Nina Sarawgi for their suggestions while preparing this post.

Comments

  • Anonymous
    January 21, 2013
    When calculating a average or a standard deviation over a sliding window, for example, one typically implements a circular buffer so that even if the calculation isn't online, data manipulation is kept to a minimum for speed purposes. That is, no new memory needs to be allocated and only a couple of operations are needed (insert the new element and update the index that marks the start of the buffer). Are any such optimisations available through Rx?

  • Anonymous
    January 21, 2013
    Not sure what you mean by "optimisations available through Rx". If you're asking whether Rx is implemented to be as fast as possible over observable collections, you can look at the code here: http://rx.codeplex.com. For things like variance,I usually compute them over the output of a Buffer, which is an IList<T>. Therefore, a simple extension method works :)

  • Anonymous
    June 30, 2015
    Ah, wonderful! Exactly what I was looking for!