Поделиться через


Computing event duration using Rx

If you've looked at IIS logs, you've probably noticed that there are events for when a request is received and when the response is sent (see a typical request). It's useful to understand average duration per request if you're examining the health of your system. Alas, there's no "duration" event. Sure, you can parse the file and compute it post-facto, but wouldn't it be nice to compute an activity's duration as it is happening using Rx?

Good news everyone! We can! The trick is reactive coincidence. Head over to Channel 9. Stay here for examples and code.

Let's abstract out the duration pattern. Assume we have events of the following type:

public class Activity
{
    public long Id {get; private set;}
    public int Payload {get; private set;}      

    public Activity(long id)
    {
        Id = id;
        Payload = (new Random()).Next(0,100);
    }      

    public override string ToString()
    {
        return String.Format("ActivityId = {0}, Payload = {1}", Id, Payload);
    }
}

And let us generate a couple of observable sequences, one for starts and one for ends:

    var starts = Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                .Select(x => new Activity(x+5))
                .Timestamp()
                .Publish();      

    var ends = Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1))
                .Select(x => new Activity(x))
                .Timestamp()
.Publish();  

Notice I'm offsetting the Ids in starts by 5 to simulate a duration of 5 seconds. Since I'm interested in computing duration, I need a handle on when events occurred. The Timestamp() operator in Rx returns an observable sequence of type Timestamped<T>, in which you have a Timestamp property and the original T is inside a Value property. Neat!

An intuitive way to compute duration is by using a cross product and a where (or, as we data folks commonly call it, a join!):

    var crossed = from s in starts
                  from e in ends
                  where s.Value.Id.Equals(e.Value.Id)
                  select new
                  {
                      Id = s.Value.Id,
                      Duration = e.Timestamp.Subtract(s.Timestamp)
                  };

 

A join, you say? Then it should be possible to also use the Join operator in Rx:

    var joined = starts.Join(ends
                             , _ => Observable.Never<Unit>()
                             , _ => Observable.Never<Unit>()
                             , (s, e) => new {Start = s, End = e})
                       .Where(t => t.Start.Value.Id == t.End.Value.Id)
                       .Select(t => new
                            {
                                Id = t.Start.Value.Id
                                , Duration = t.End.Timestamp
.Subtract(t.Start.Timestamp)
                            });

What's going on here? The second and third parameter are returning a non-terminating observable sequence. This means we are assuming the duration of events in both sequences is unknown when we evaluate the join, that is, we expect to be able to correlate them for as long as the sequences exist. This is exactly the implication in our crossed expression. However, there is a variation of the duration pattern that also occurs: How much time elapsed between starts and ends (assuming we should see an end within n seconds)? In some cases, we don't want to wait forever for an answer, and would like to know if an end didn't occur within a time bound. Turns out we can use the join operator to express the same condition:

    var boundJoined = starts.Join(ends
                                 , _ => Observable.Timer(
TimeSpan.FromSeconds(6))
                                 , _ => Observable.Empty<Unit>()
                                 , (s, e) => new {Start = s, End = e})
                            .Where(t => t.Start.Value.Id == t.End.Value.Id)
                            .Select(t => new
                            {
                                Id = t.Start.Value.Id
                                , Duration = t.End.Timestamp
.Subtract(t.Start.Timestamp)
                            });

The second parameter expresses our assumption that start events are "valid" for 6 seconds, and that end events are finite. So now you have a template for computing duration given a stream of start events and a stream of end events.

Try them out! (don't forget to Connect() the starts and ends sequences :) ) [LINQPad share]