Taming Hopping Windows
At first glance, hopping windows seem fairly innocuous and obvious. They organize events into windows with a simple periodic definition: the windows have some duration d (e.g. a window covers 5 second time intervals), an interval or period p (e.g. a new window starts every 2 seconds) and an alignment a (e.g. one of those windows starts at 12:00 PM on March 15, 2012 UTC).
var wins = xs
.HoppingWindow(TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(2),
new DateTime(2012, 3, 15, 12, 0, 0, DateTimeKind.Utc));
Logically, there is a window with start time a + np and end time a + np + d for every integer n. That’s a lot of windows. So why doesn’t the following query (always) blow up?
var query = wins.Select(win => win.Count());
A few users have asked why StreamInsight doesn’t produce output for empty windows. Primarily it’s because there is an infinite number of empty windows! (Actually, StreamInsight uses DateTimeOffset.MaxValue to approximate “the end of time” and DateTimeOffset.MinValue to approximate “the beginning of time”, so the number of windows is lower in practice.)
That was the good news. Now the bad news. Events also have duration. Consider the following simple input:
var xs = this.Application
.DefineEnumerable(() => new[]
{ EdgeEvent.CreateStart(DateTimeOffset.UtcNow, 0) })
.ToStreamable(AdvanceTimeSettings.IncreasingStartTime);
Because the event has no explicit end edge, it lasts until the end of time. So there are lots of non-empty windows if we apply a hopping window to that single event! For this reason, we need to be careful with hopping window queries in StreamInsight. Or we can switch to a custom implementation of hopping windows that doesn’t suffer from this shortcoming.
The alternate window implementation produces output only when the input changes. We start by breaking up the timeline into non-overlapping intervals assigned to each window. In figure 1, six hopping windows (“Windows”) are assigned to six intervals (“Assignments”) in the timeline. Next we take input events (“Events”) and alter their lifetimes (“Altered Events”) so that they cover the intervals of the windows they intersect. In figure 1, you can see that the first event e1 intersects windows w1 and w2 so it is adjusted to cover assignments a1 and a2. Finally, we can use snapshot windows (“Snapshots”) to produce output for the hopping windows. Notice however that instead of having six windows generating output, we have only four. The first and second snapshots correspond to the first and second hopping windows. The remaining snapshots however cover two hopping windows each! While in this example we saved only two events, the savings can be more significant when the ratio of event duration to window duration is higher.
Figure 1: Timeline
The implementation of this strategy is straightforward. We need to set the start times of events to the start time of the interval assigned to the earliest window including the start time. Similarly, we need to modify the end times of events to the end time of the interval assigned to the latest window including the end time. The following snap-to-boundary function that rounds a timestamp value t down to the nearest value t' <= t such that t' is a + np for some integer n will be useful. For convenience, we will represent both DateTime and TimeSpan values using long ticks:
static long SnapToBoundary(long t, long a, long p)
{
return t - ((t - a) % p) - (t > a ? 0L : p);
}
How do we find the earliest window including the start time for an event? It’s the window following the last window that does not include the start time assuming that there are no gaps in the windows (i.e. duration < interval), and limitation of this solution. To find the end time of that antecedent window, we need to know the alignment of window ends:
long e = a + (d % p);
Using the window end alignment, we are finally ready to describe the start time selector:
static long AdjustStartTime(long t, long e, long p)
{
return SnapToBoundary(t, e, p) + p;
}
To find the latest window including the end time for an event, we look for the last window start time (non-inclusive):
public static long AdjustEndTime(long t, long a, long d, long p)
{
return SnapToBoundary(t - 1, a, p) + p + d;
}
Bringing it together, we can define the translation from events to ‘altered events’ as in Figure 1:
public static IQStreamable<T> SnapToWindowIntervals<T>(IQStreamable<T> source, TimeSpan duration, TimeSpan interval, DateTime alignment)
{
if (source == null) throw new ArgumentNullException("source");
// reason about DateTime and TimeSpan in ticks
long d = Math.Min(DateTime.MaxValue.Ticks, duration.Ticks);
long p = Math.Min(DateTime.MaxValue.Ticks, Math.Abs(interval.Ticks));
// set alignment to earliest possible window
var a = alignment.ToUniversalTime().Ticks % p;
// verify constraints of this solution
if (d <= 0L) { throw new ArgumentOutOfRangeException("duration"); }
if (p == 0L || p > d) { throw new ArgumentOutOfRangeException("interval"); }
// find the alignment of window ends
long e = a + (d % p);
return source.AlterEventLifetime(
evt => ToDateTime(AdjustStartTime(evt.StartTime.ToUniversalTime().Ticks, e, p)),
evt => ToDateTime(AdjustEndTime(evt.EndTime.ToUniversalTime().Ticks, a, d, p)) -
ToDateTime(AdjustStartTime(evt.StartTime.ToUniversalTime().Ticks, e, p)));
}
public static DateTime ToDateTime(long ticks)
{
// just snap to min or max value rather than under/overflowing
return ticks < DateTime.MinValue.Ticks
? new DateTime(DateTime.MinValue.Ticks, DateTimeKind.Utc)
: ticks > DateTime.MaxValue.Ticks
? new DateTime(DateTime.MaxValue.Ticks, DateTimeKind.Utc)
: new DateTime(ticks, DateTimeKind.Utc);
}
Finally, we can describe our custom hopping window operator:
public static IQWindowedStreamable<T> HoppingWindow2<T>(
IQStreamable<T> source,
TimeSpan duration,
TimeSpan interval,
DateTime alignment)
{
if (source == null) { throw new ArgumentNullException("source"); }
return SnapToWindowIntervals(source, duration, interval, alignment).SnapshotWindow();
}
By switching from HoppingWindow to HoppingWindow2 in the following example, the query returns quickly rather than gobbling resources and ultimately failing!
public void Main()
{
var start = new DateTimeOffset(new DateTime(2012, 6, 28), TimeSpan.Zero);
var duration = TimeSpan.FromSeconds(5);
var interval = TimeSpan.FromSeconds(2);
var alignment = new DateTime(2012, 3, 15, 12, 0, 0, DateTimeKind.Utc);
var events = this.Application.DefineEnumerable(() => new[]
{
EdgeEvent.CreateStart(start.AddSeconds(0), "e0"),
EdgeEvent.CreateStart(start.AddSeconds(1), "e1"),
EdgeEvent.CreateEnd(start.AddSeconds(1), start.AddSeconds(2), "e1"),
EdgeEvent.CreateStart(start.AddSeconds(3), "e2"),
EdgeEvent.CreateStart(start.AddSeconds(9), "e3"),
EdgeEvent.CreateEnd(start.AddSeconds(3), start.AddSeconds(10), "e2"),
EdgeEvent.CreateEnd(start.AddSeconds(9), start.AddSeconds(10), "e3"),
}).ToStreamable(AdvanceTimeSettings.IncreasingStartTime);
var adjustedEvents = SnapToWindowIntervals(events, duration, interval, alignment);
var query = from win in HoppingWindow2(events, duration, interval, alignment)
select win.Count();
DisplayResults(adjustedEvents, "Adjusted Events");
DisplayResults(query, "Query");
}
As you can see, instead of producing a massive number of windows for the open start edge e0, a single window is emitted from 12:00:15 AM until the end of time:
Adjusted Events
StartTime |
EndTime |
Payload |
6/28/2012 12:00:01 AM |
12/31/9999 11:59:59 PM |
e0 |
6/28/2012 12:00:03 AM |
6/28/2012 12:00:07 AM |
e1 |
6/28/2012 12:00:05 AM |
6/28/2012 12:00:15 AM |
e2 |
6/28/2012 12:00:11 AM |
6/28/2012 12:00:15 AM |
e3 |
Query
StartTime |
EndTime |
Payload |
6/28/2012 12:00:01 AM |
6/28/2012 12:00:03 AM |
1 |
6/28/2012 12:00:03 AM |
6/28/2012 12:00:05 AM |
2 |
6/28/2012 12:00:05 AM |
6/28/2012 12:00:07 AM |
3 |
6/28/2012 12:00:07 AM |
6/28/2012 12:00:11 AM |
2 |
6/28/2012 12:00:11 AM |
6/28/2012 12:00:15 AM |
3 |
6/28/2012 12:00:15 AM |
12/31/9999 11:59:59 PM |
1 |
Regards,
The StreamInsight Team