StreamInsight Sequence Integration: Five Easy Pieces

Over the past few weeks, I’ve spent time building a handful of applications using the new sequence integration APIs in StreamInsight 1.1. I think StreamInsight veterans will be pleasantly surprised at the seamlessness of the experience! If you’re new to StreamInsight, now’s your chance to quickly build a temporally aware application. In this post, I’ll walk through five components of a typical end-to-end StreamInsight query, from event source to event sink.

First, take some time to download the latest version and kick the tires:

Now you're ready to build a StreamInsight application. A few considerations:

  • If you are creating a .NET 4.0 application, make sure it is targeting the full .NET 4.0 profile, not the client profile. Configure the target framework through project properties in Visual Studio 2010.
  • Add a reference to the Microsoft.ComplexEventProcessing assembly. If you plan on using an IObservable<> event source or event sink, add a reference to the Microsoft.ComplexEventProcessing.Observable assembly as well.
  • Use the Microsoft.ComplexEventProcessing and Microsoft.ComplexEventProcessing.Linq namespaces. The first allows you to embed and manage a StreamInsight server. The second exposes the StreamInsight LINQ dialect.

Now we'll travel downstream, using the SequenceIntegration\Northwind sample as our guide.

Event Sources

Event sources for a query can be based on custom input adapters, other StreamInsight queries, or – new in version 1.1 – .NET IObservable<> and IEnumerable<> sequences. The good news: in the .NET world, IEnumerable<> pull-based sequences are pervasive. SQL, OData, Sharepoint, you name it. Better news from the perspective of a Complex Event Processing system: asynchronous and push-based sequences can be easily exposed via the IObservable<> interface, particularly if you take advantage of the .NET Reactive Framework (Rx).

In this simple example, we use two OData service queries as our event sources:

 NorthwindEntities northwind = new NorthwindEntities(
    new Uri("https://services.odata.org/Northwind/Northwind.svc"));

// Issue OData queries to determine start and end times for orders.
// So that the sources behave like temporal streams, we order by the
// corresponding dates.
var ordersWithRegions =
    from o in northwind.Orders
    where o.ShipRegion != null
    select o;

var orderStartTimes =
    from o in ordersWithRegions
    where o.OrderDate != null
    orderby o.OrderDate
    select new { StartTime = (DateTime)o.OrderDate, o.OrderID, o.ShipRegion };

var orderEndTimes =
    from o in ordersWithRegions
    where o.ShippedDate != null
    orderby o.ShippedDate
    select new { EndTime = (DateTime)o.ShippedDate, o.OrderID };

The orderStartTimes and orderEndTimes queries implement IEnumerable<> which makes them suitable event sources for StreamInsight.

Temporal streams

An IObservable<> or IEnumerable<> event source can feed a temporal stream. A temporal stream is a sequence of events annotated with temporal information: timestamps for events and punctuation indicating when a particular point in time has been committed. In the above example, we have two sources orderStartTimes and orderEndTimes each including timestamp fields – StartTime and EndTime respectively – as well as a commitment based on orderby clauses that events timestamps are monotonic. We describe these temporal characteristics to StreamInsight using the ToPointStream method:

 // Map OData queries to StreamInsight inputs
var startStream = orderStartTimes.ToPointStream(orderApp, s =>
    PointEvent.CreateInsert(s.StartTime, s), 
    AdvanceTimeSettings.IncreasingStartTime);

var endStream = orderEndTimes.ToPointStream(orderApp, e =>
    PointEvent.CreateInsert(e.EndTime, e), 
    AdvanceTimeSettings.IncreasingStartTime);

The arguments to the ToPointStream extension method are described below:

  • IEnumerable<T> source: the event source. In this case, the source is an OData query.

  • Application application: the StreamInsight application that will host the temporal query.

  • Func<TInput, PointEvent<TPayload> selector: takes an element of the source and turns it into a temporal event. The first selector loosely reads: “the event s happened at the point in time s.StartTime”. The first argument to PointEvent.CreateInsert indicates the timestamp for the event, and the second argument describe the payload of the event – in this case the entire row.

    Aside: StreamInsight has restrictions on payload types. Basically, the payload must consist of a type or struct with only “primitive” (string, number, etc.) fields and properties. See the Payload Field Requirements section @ https://msdn.microsoft.com/en-us/library/ee378905.aspx for details. The event selector can be used to reshape your inputs to a supported payload type.

  • AdvanceTimeSetting advanceTimeSettings: an optional parameter that describes a policy for automatically generating punctuation. In the above example, we indicate that timestamps are increasing in the input. StreamInsight can then automatically generate Current Time Increment (CTI) punctuation for each event: “we commit to everything before the event’s timestamp”.

  • string streamName: an optional parameter allowing you to assign a name to the input stream. I have not specified a stream name in the above example. This feature is particularly useful if you’re importing punctuations from one stream to another (see the SequenceIntegration\PerformanceCounters sample for instance).

Note that there are several variations on the To*Stream method supporting IObservable<> or IEnumerable<> event sources and the shaping of point, interval or edge data.

Take a look at Cip’s Time in StreamInsight series for more information.

Temporal query

Now that we have described the temporal characteristics of our event sources, we can compose a StreamInsight query. I’ll simply copy the code here without too much explanation since I’m focused on data ingress and egress in this post:

 // Use clip to synthesize events lasting from the start of each order to the end
// of each order.
var clippedStream = startStream
    .AlterEventDuration(e => TimeSpan.MaxValue)
    .ClipEventDuration(endStream, (s, e) => s.OrderID == e.OrderID);

// Count the number of coincident orders per region
var counts = from o in clippedStream
                group o by o.ShipRegion into g
                from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                select new { ShipRegion = g.Key, Count = win.Count() };

// Display output whenever there are more than 2 active orders in a region.
const int threshold = 2;
var query = from c in counts
            where c.Count > threshold
            select c;

Event sink

Creating an event sink is straightforward. Several extension methods support the transformation of a temporal query (CepStream<>) to an event sink, with support for permutations of IObservable or IEnumerable sequences and TPayload, PointEvent<TPayload>, IntervalEvent<TPayload> or EdgeEvent<TPayload> elements. In the following example, we translate the query to a sequence of interval events using ToIntervalEnumerable. We then filter out insert events – skipping CTI punctuation – and project out relevant temporal and payload fields:

 // Map the query to an IEnumerable sink
var sink = from i in query.ToIntervalEnumerable()
           where i.EventKind == EventKind.Insert
           select new { i.StartTime, i.EndTime, i.Payload.Count, i.Payload.ShipRegion };

Consuming results

Now that we’re back in the world of .NET sequences, there are many possibilities for consuming the results. For now, we’ll just write the event sink contents to the console:

 foreach (var r in sink)
{
    Console.WriteLine(r);
}

Interestingly, calling GetEnumerator on sink triggers a sequence of actions:

  1. A stream query is deployed to the embedded server and started, which implicitly spins up the query inputs, triggering…
  2. calls to GetEnumerator on the event sources we defined earlier, which…
  3. causes a query against the OData service to be executed.

StreamInsight queries now composes seamlessly with other LINQ providers! In fact, if you review the code above you can see that we’ve leveraged LINQ to OData, LINQ to Objects, and LINQ to StreamInsight. The SequenceIntegration samples linked above illustrate some other integration possibilities as well:

  • A WPF control that observers an IObservable event sink.
  • An IObservable event source that polls performance counters.
  • An IEnumerable event source that reads the contents of a file.

There – you have the five easy pieces! Comes with a side-order of toast.

Comments

  • Anonymous
    December 08, 2010
    What is NorthwindEntities ???? Can't find it.

  • Anonymous
    January 05, 2011
    NorthwindEntities is an Entity Framework data context backed by the SQL Server Northwind database. It is included in sequence integration sample project that can be downloaded @streaminsight.codeplex.com/.../46435.

  • Anonymous
    May 18, 2015
    for the "Consuming results" part (the foreach statement), it is executed once. How can I continuously consume the result? Should I write it as following: while (true) {    foreach (var r in sink)    {        Console.WriteLine(r);    } }