Jaa


101’ish LINQ Samples for StreamInsight (part 1 - filtering and aggregation)

Following in the great tradition of other learn through example LINQ example pages, such as 101 LINQ Samples, here’s the first in a series of posts on StreamInsight query examples.  This will first show up on this blog, then get migrated over to a centralized site on the StreamInsight developer center.  We’ve got a fairly hefty queue we’re working through, but if there are specific examples or query patterns of interest, please post them in the comments.

Each query pattern will consist of a description of the input set (.csv file samples for most of these, for ease of use), the full query and the output result.  For more complicated query structures the query pattern will also be described visually, using a Visio stencil (to be described in future posts). 

The full application framework that I use to run all of these queries, associate with the data sources, etc, is available here.  Illustration of the techniques in this codebase will be left for a future post. 

In this post:


[Where] - How do I filter a stream, keeping only specific events?

 var filterQuery = from e in inputStream
    where e.Value > 20
    select e;

[Input] Result:

 SimpleFilter,Point,12:00:02.000,1001,77SimpleFilter,Point,12:00:03.000,1001,44SimpleFilter,Point,12:00:04.000,1001,22SimpleFilter,Point,12:00:05.000,1001,51SimpleFilter,Point,12:00:06.000,1001,46SimpleFilter,Point,12:00:07.000,1001,71SimpleFilter,Point,12:00:08.000,1001,37SimpleFilter,Point,12:00:09.000,1001,45

How do I filter on multiple conditions?

 var filterQuery = from e in inputStream
where e.Value > 70 && e.SensorId == 1001
select e;

[Input] Result:

 MediumFilter,Point,12:00:01.001,1001,77MediumFilter,Point,12:00:03.010,1001,72MediumFilter,Point,12:00:12.082,1001,73MediumFilter,Point,12:00:14.145,1001,75MediumFilter,Point,12:00:14.154,1001,71MediumFilter,Point,12:00:15.163,1001,74MediumFilter,Point,12:00:15.172,1001,73

How can I calculate aggregates, such as averages, over a set of events?

All aggregates, such as averages, min, max, etc are calculated over a set of events.  Sets of events in StreamInsight are defined as those events falling into a window.  Windows operate over a period of time (hopping, tumbling and sliding), or over a specific number of events (count).

How can I calculate the average of all events in the last 5 seconds?

 var query = from window in inputStream.TumblingWindow(
    TimeSpan.FromSeconds(5),
    HoppingWindowOutputPolicy.ClipToWindowEnd)
select new 
{
    Average = window.Avg(e => e.Value)
};

[Input] Result:

 SimpleTumblingWindow,Point,12:00:00.000,32.2SimpleTumblingWindow,Point,12:00:05.000,50

How can I calculate the average of all events in the past 5 seconds, every 2 seconds?

 var query = from window in inputStream.HoppingWindow(
                TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(2), 
                HoppingWindowOutputPolicy.ClipToWindowEnd)
            select new
            {
                Average = window.Avg(e => e.Value)
            };

[Input] Result:

 SimpleHoppingWindow,Point,11:59:56.000,14SimpleHoppingWindow,Point,11:59:58.000,31.66666SimpleHoppingWindow,Point,12:00:00.000,32.2SimpleHoppingWindow,Point,12:00:02.000,48SimpleHoppingWindow,Point,12:00:04.000,45.4SimpleHoppingWindow,Point,12:00:06.000,49.75SimpleHoppingWindow,Point,12:00:08.000,41

How can I calculate the average of all events in the past 5 seconds, every time a new event arrives?

 var query = from window in inputStream
    .AlterEventDuration(e => TimeSpan.FromSeconds(5))
    .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
    Average = window.Avg(e => e.Value)
};

[Input] Result:

 SimpleSlidingWindow,Point,12:00:00.000,14SimpleSlidingWindow,Point,12:00:01.000,9SimpleSlidingWindow,Point,12:00:02.000,31.66SimpleSlidingWindow,Point,12:00:03.000,34.75SimpleSlidingWindow,Point,12:00:04.000,32.2SimpleSlidingWindow,Point,12:00:05.000,39.6SimpleSlidingWindow,Point,12:00:06.000,48SimpleSlidingWindow,Point,12:00:07.000,46.8SimpleSlidingWindow,Point,12:00:08.000,45.4SimpleSlidingWindow,Point,12:00:09.000,50SimpleSlidingWindow,Point,12:00:10.000,49.75SimpleSlidingWindow,Point,12:00:11.000,51SimpleSlidingWindow,Point,12:00:12.000,41SimpleSlidingWindow,Point,12:00:13.000,45

How can I calculate the average of the past 5 events?

Aggregates over a set number of events are produced using Count windows.  As of StreamInsight v1, Count Windows cannot use the built-in aggregate functions (Count, Avg, Min, Max), but must instead use user-defined aggregates.  The code sample below shows the use of a simple averaging function with a count window with the user defined aggregate.

 var query = from window in inputStream.CountByStartTimeWindow(
            5, CountWindowOutputPolicy.PointAlignToWindowEnd)
        select new
        {
            Average = window.UserDefinedAverage(e => e.Value)
        };

 public class SimpleAggregate : CepAggregate<double, double>
{
    public override double GenerateOutput(IEnumerable<double> payloads)
    {
        return payloads.Average();
    }
}

public static class MyExtensions
{
    [CepUserDefinedAggregate(typeof(SimpleAggregate))]
    public static double UserDefinedAverage<InputT>(this CepWindow<InputT> window, 
        Expression<Func<InputT, double>> map)
    {
        throw CepUtility.DoNotCall();
    }

}

[Input] Result:

 SimpleCountWindow,Point,12:00:04.000,32.2SimpleCountWindow,Point,12:00:05.000,39.6SimpleCountWindow,Point,12:00:06.000,48SimpleCountWindow,Point,12:00:07.000,46.8SimpleCountWindow,Point,12:00:08.000,45.4SimpleCountWindow,Point,12:00:09.000,50

How do I measure the rate of arriving events every two seconds?

 var query = from window in inputStream.TumblingWindow(
    TimeSpan.FromSeconds(2),
    HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
    Count = window.Count()
};

[Input] Result:

 SimpleEventCount,Point,12:00:00.000,54SimpleEventCount,Point,12:00:02.000,54SimpleEventCount,Point,12:00:04.000,54SimpleEventCount,Point,12:00:06.000,54SimpleEventCount,Point,12:00:08.000,54SimpleEventCount,Point,12:00:10.000,54SimpleEventCount,Point,12:00:12.000,54SimpleEventCount,Point,12:00:14.000,54SimpleEventCount,Point,12:00:16.000,54

 

How can I calculate the average of each group of events in the last 10 seconds?

For this example, we’ll calculate the average of each sensor every 10 seconds.  We do this by first grouping the events by the sensor ID, then performing the same aggregation operation from the average without grouping (with the addition of the group key – in this case, the sensor ID).

 var query = from e in inputStream 
group e by e.SensorId into sensorGroups
from window in sensorGroups.TumblingWindow(
    TimeSpan.FromSeconds(10),
    HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
    SensorId = sensorGroups.Key,
    Average = window.Avg(e => e.Value)
};

[Input] Result:

 GroupedTumblingWindow,Point,12:00:00.000,42.1,1001GroupedTumblingWindow,Point,12:00:00.000,34.6,1002GroupedTumblingWindow,Point,12:00:00.000,39.7,1003GroupedTumblingWindow,Point,12:00:00.000,50.9,1004GroupedTumblingWindow,Point,12:00:00.000,30.7,1005GroupedTumblingWindow,Point,12:00:00.000,38.3,1006GroupedTumblingWindow,Point,12:00:00.000,36.8,1007GroupedTumblingWindow,Point,12:00:00.000,41.7,1008GroupedTumblingWindow,Point,12:00:00.000,34.4,1009GroupedTumblingWindow,Point,12:00:10.000,46.375,1001GroupedTumblingWindow,Point,12:00:10.000,40.2,1002GroupedTumblingWindow,Point,12:00:10.000,36.1,1003GroupedTumblingWindow,Point,12:00:10.000,37.3,1004GroupedTumblingWindow,Point,12:00:10.000,31.8,1005GroupedTumblingWindow,Point,12:00:10.000,41.4,1006GroupedTumblingWindow,Point,12:00:10.000,40.3,1007GroupedTumblingWindow,Point,12:00:10.000,43.5,1008GroupedTumblingWindow,Point,12:00:10.000,43.3,1009

How can I calculate the average of each group of events in the last 20 seconds, every 10 seconds?

  var query = from e in inputStream
group e by e.SensorId into sensorGroups
from window in sensorGroups.HoppingWindow(
    TimeSpan.FromSeconds(20), TimeSpan.FromSeconds(10),
    HoppingWindowOutputPolicy.ClipToWindowEnd)
select new
{
    SensorId = sensorGroups.Key,
    Average = window.Avg(e => e.Value)
};

[Input] Result:

 GroupedHoppingWindow,Point,11:59:50.000,42.1,1001GroupedHoppingWindow,Point,11:59:50.000,34.6,1002GroupedHoppingWindow,Point,11:59:50.000,39.7,1003GroupedHoppingWindow,Point,11:59:50.000,50.9,1004GroupedHoppingWindow,Point,11:59:50.000,30.7,1005GroupedHoppingWindow,Point,11:59:50.000,38.3,1006GroupedHoppingWindow,Point,11:59:50.000,36.8,1007GroupedHoppingWindow,Point,11:59:50.000,41.7,1008GroupedHoppingWindow,Point,11:59:50.000,34.4,1009GroupedHoppingWindow,Point,12:00:00.000,44,1001GroupedHoppingWindow,Point,12:00:00.000,37.0,1002GroupedHoppingWindow,Point,12:00:00.000,38.1,1003GroupedHoppingWindow,Point,12:00:00.000,44.8,1004GroupedHoppingWindow,Point,12:00:00.000,31.2,1005GroupedHoppingWindow,Point,12:00:00.000,39.7,1006GroupedHoppingWindow,Point,12:00:00.000,38.4,1007GroupedHoppingWindow,Point,12:00:00.000,42.5,1008GroupedHoppingWindow,Point,12:00:00.000,38.4,1009GroupedHoppingWindow,Point,12:00:10.000,46.3,1001GroupedHoppingWindow,Point,12:00:10.000,40.2,1002GroupedHoppingWindow,Point,12:00:10.000,36.1,1003GroupedHoppingWindow,Point,12:00:10.000,37.3,1004GroupedHoppingWindow,Point,12:00:10.000,31.8,1005GroupedHoppingWindow,Point,12:00:10.000,41.4,1006GroupedHoppingWindow,Point,12:00:10.000,40.3,1007GroupedHoppingWindow,Point,12:00:10.000,43.5,1008GroupedHoppingWindow,Point,12:00:10.000,43.3,1009

How can I calculate the average of each group of events in the past 10 seconds, every time a new event in that group arrives?

 var query = from e in inputStream
    .AlterEventDuration(e => TimeSpan.FromSeconds(10))
group e by e.SensorId into sensorGroups
from window in sensorGroups
    .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
    SensorId = sensorGroups.Key,
    Average = window.Avg(e => e.Value)
};

[Input] Result:

 <snip>
 GroupedSlidingWindow,Point,12:00:17.017,39.3,1008GroupedSlidingWindow,Point,12:00:17.018,39.4,1009GroupedSlidingWindow,Point,12:00:17.019,43,1001GroupedSlidingWindow,Point,12:00:17.020,43.1,1002GroupedSlidingWindow,Point,12:00:17.021,37.8,1003GroupedSlidingWindow,Point,12:00:17.022,41.8,1004GroupedSlidingWindow,Point,12:00:17.023,33.7,1005GroupedSlidingWindow,Point,12:00:17.024,39.2,1006GroupedSlidingWindow,Point,12:00:17.025,40.3,1007GroupedSlidingWindow,Point,12:00:17.026,38.9,1008GroupedSlidingWindow,Point,12:00:17.027,40.8,1009<snip>

How can I find the events with the largest value every 10 seconds?

 var query = (from window in inputStream.TumblingWindow(
    TimeSpan.FromSeconds(10),
    HoppingWindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.Value descending
select e).Take(1);

[Input] Result:

 TopK_TumblingDescending,Point,12:00:00.000,1006,80TopK_TumblingDescending,Point,12:00:00.000,1007,80TopK_TumblingDescending,Point,12:00:00.000,1009,80TopK_TumblingDescending,Point,12:00:10.000,1007,81

Note that for the first group of events, the are three events with the same maximum value (80) in the same window of time.  Thusly, all of the events with that maximum value are returned.

How can I find the events with the two smallest values in the last 10 seconds, every 5 seconds?

 var query = (from window in inputStream.HoppingWindow(
   TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(5),
   HoppingWindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.Value ascending
select e).Take(2);

[Input] Result:

 TopK_HoppingAscending,Point,11:59:55.000,1004,2TopK_HoppingAscending,Point,11:59:55.000,1007,1TopK_HoppingAscending,Point,11:59:55.000,1008,2TopK_HoppingAscending,Point,12:00:00.000,1001,1TopK_HoppingAscending,Point,12:00:00.000,1001,1TopK_HoppingAscending,Point,12:00:00.000,1003,1TopK_HoppingAscending,Point,12:00:00.000,1007,1TopK_HoppingAscending,Point,12:00:05.000,1001,1TopK_HoppingAscending,Point,12:00:05.000,1001,1TopK_HoppingAscending,Point,12:00:05.000,1003,1TopK_HoppingAscending,Point,12:00:10.000,1004,2TopK_HoppingAscending,Point,12:00:10.000,1004,2TopK_HoppingAscending,Point,12:00:10.000,1006,2TopK_HoppingAscending,Point,12:00:10.000,1007,2TopK_HoppingAscending,Point,12:00:15.000,1004,2TopK_HoppingAscending,Point,12:00:15.000,1007,2

Note the same effect herein – multiple events having identical minimum values in the window.

How can I find the group of events with the highest average value in the last 10 seconds?

This query is performed in two stages.  In the first query we determine the average value of each sensor over the past 10 seconds (using a group by, a tumbling window and an Avg()).  Then we take the output of that aggregate and perform a snapshot on top of that (against each set of events from the first query), sort it and take the top value.

 var avg = from e in inputStream
    group e by e.SensorId into sensorGroups
    from window in sensorGroups.TumblingWindow(
        TimeSpan.FromSeconds(10),
        HoppingWindowOutputPolicy.ClipToWindowEnd)
    select new
    {
        SensorId = sensorGroups.Key,
        Average = window.Avg(e => e.Value)
    };

var query = (from win in avg
        .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
    from e in win
    orderby e.Average descending
    select e).Take(1);

[Input] Results:

 TopK_GroupTumblingDescending,Point,12:00:00.000,50.9,1004TopK_GroupTumblingDescending,Point,12:00:10.000,46.3,1001

How can I find the group of events with the highest maximum value in the last 10 seconds, every 5 seconds?

 var avg = from e in inputStream
    group e by e.SensorId into sensorGroups
    from window in sensorGroups.HoppingWindow(
        TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(5),
        HoppingWindowOutputPolicy.ClipToWindowEnd)
    select new
    {
        SensorId = sensorGroups.Key,
        Maximum = window.Max(e => e.Value),
        Minimum = window.Min(e => e.Value)
    };

var query = (from win in avg
        .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
    from e in win
    orderby e.Maximum descending
    select e).Take(1);

[Input] Result:

 TopK_GroupHoppingDescending,Point,11:59:55.000,80,6,1006TopK_GroupHoppingDescending,Point,11:59:55.000,80,1,1007TopK_GroupHoppingDescending,Point,11:59:55.000,80,4,1009TopK_GroupHoppingDescending,Point,12:00:00.000,80,6,1006TopK_GroupHoppingDescending,Point,12:00:00.000,80,6,1006TopK_GroupHoppingDescending,Point,12:00:00.000,80,1,1007TopK_GroupHoppingDescending,Point,12:00:00.000,80,1,1007TopK_GroupHoppingDescending,Point,12:00:00.000,80,2,1009TopK_GroupHoppingDescending,Point,12:00:00.000,80,4,1009TopK_GroupHoppingDescending,Point,12:00:05.000,80,2,1006TopK_GroupHoppingDescending,Point,12:00:05.000,80,6,1006TopK_GroupHoppingDescending,Point,12:00:05.000,80,1,1007TopK_GroupHoppingDescending,Point,12:00:05.000,80,2,1009TopK_GroupHoppingDescending,Point,12:00:10.000,81,2,1007TopK_GroupHoppingDescending,Point,12:00:15.000,81,2,1007TopK_GroupHoppingDescending,Point,12:00:15.000,81,2,1007TopK_GroupHoppingDescending,Point,12:00:20.000,81,2,1007

Again, note the StreamInsight engine delivering the values corresponding to the top maximum value (80 or 81).

How can I detect if two events with the same ID value are received within a specific time frame, such as a 5 minute window?

To implement this query, we use a sliding window to look at arriving events in a 5 minute window, grouped by their ID value.  We then count the number of events in the window with that ID, and filter out groups that have less than 2 events.  Or, to say it another say, we only want groups (sensors) that have more than 1 event, thus removing groups with only 1 event (there are no groups with zero :).

 var eventCount = from e in inputStream.AlterEventDuration(e => TimeSpan.FromMinutes(5))
     group e by e.SensorId into sensorGroups
     from win in sensorGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
     select new
     {
         SensorId = sensorGroups.Key,
         Count = win.Count()
     };

var query = from e in eventCount
where e.Count >= 2
select e;

[Input] Result:

 DetectMultipleInWindow,Point,12:05:16.206,4,1008DetectMultipleInWindow,Point,12:05:16.207,4,1009DetectMultipleInWindow,Point,12:05:16.208,3,1001DetectMultipleInWindow,Point,12:05:16.209,3,1002<snip>