Jaa


StreamInsight: Reading from other data contexts into StreamInsight with LINQPad

The LINQPad driver for StreamInsight provides an awesomely easy way to run StreamInsight queries using easily accessible data contexts of temporal streams.  However, as LINQPad only supports a single active data context connection in a query it can be difficult to use one of the hard to read “without a data context” sources (such as OData) and use StreamInsight.

As LINQPad does enable execution of generic .NET code, we can use a little trick to get around the single-connection context limitation simply placing the embedded server creation code (i.e. Server.Create()) directly into a LINQPad query statement (regardless of the active context).

Step 1: Reading an OData Service in LINQPad

We’ll start off by importing the Northwind data set as an OData service, then converting the Orders table into a CepStream.  For more background on how to work with the new (as of 1.1) IEnumerable/IObservable adapters for StreamInsight refer to Colin Meek’s excellent StreamInsight Sequence Integration: Five Easy Pieces post.

  • Start a copy of LINQPad with the StreamInsight driver installed.  Refer to the StreamInsight LINQPad Driver post if you haven’t yet done this.
  • From the data context list, click on Add Connection.  From the Choose Data Context dialog, select WCF Data Services (OData) and click Next.

image_thumb1[1]

image_thumb3[1]

The Northwind OData context is now available for use in LINQPad.  One question that might immediately crop up is why didn’t I use the StreamInsight database context instead of the OData context?  LINQPad, unfortunately, doesn’t yet support multiple database contexts in a single query.  With the need to pull data from one context (LINQ to OData) and process it in another (StreamInsight), and the LINQ to OData context being rather challenging to create on the fly, we’ll use the Northwind as our primary context, and dynamically generate a StreamInsight embedded server.

  • The Northwind data context should now be available in the list of connections.  From the query pane, select C# Statement(s) as the language, and the Northwind URI as the database.  We use C# Statement instead of C# Expression to let us build up more complicated objects and code (such as the StreamInsight embedded server object).

image_thumb6

  • Let’s run a simple query to make sure that the data feed is flowing through appropriately.  Type the code in the block below into the query window and press F5 to execute the query.

  1. // Retrieve orders from the Northwind OData feed that
  2. // have valid shipping dates and order dates. Select
  3. // fields to be used in the StreamInsight query (basic
  4. // types only)
  5. var orders = from o in Orders
  6.                 where o.ShippedDate != null && o.OrderDate != null
  7.                 orderby o.OrderDate ascending
  8.                 select new
  9.                 {
  10.                     CustomerName = o.Customer.CompanyName,
  11.                     OrderDate = (DateTime)o.OrderDate,
  12.                     ShippedDate = (DateTime)o.ShippedDate,
  13.                     Region = o.ShipRegion
  14.                 };
  15.  
  16. orders.Dump();
  • The results should be similar to those shown in the screenshot below.  Once you’ve validated that the OData feed is being consumed properly, delete the orders.Dump() call.
IOrderedQueryable<> (200 items)
CustomerName OrderDate ShippedDate Region
Vins et alcools Chevalier 7/4/1996 0:00 7/16/1996 0:00 null
Toms Spezialitäten 7/5/1996 0:00 7/10/1996 0:00 null
Hanari Carnes 7/8/1996 0:00 7/12/1996 0:00 RJ
Victuailles en stock 7/8/1996 0:00 7/15/1996 0:00 null
Suprêmes délices 7/9/1996 0:00 7/11/1996 0:00 null
Hanari Carnes 7/10/1996 0:00 7/16/1996 0:00 RJ
Chop-suey Chinese 7/11/1996 0:00 7/23/1996 0:00 null
Richter Supermarkt 7/12/1996 0:00 7/15/1996 0:00 null
Wellington Importadora 7/15/1996 0:00 7/17/1996 0:00 SP
HILARION-Abastos 7/16/1996 0:00 7/22/1996 0:00 Táchira

 

Step 2: Creating a StreamInsight host and a basic query

Now that we have the OData feed available in LINQPad, let’s go ahead and set things up to allow us to create and execute StreamInsight queries without a data context (a data context or data connection makes this a lot easier – but since we also need the Northwind OData context, one of these has to be manually defined).

  • In LINQPad, click on the Query menu item, then on Query Properties (or press F4).
  • In the Query Properties dialog, add the following references to the Additional References list, as per the screenshot below.
    • Microsoft.ComplexProcessing.dll
    • Microsoft.ComplexProcessing.Observable.dll

image_thumb9[1]

  • Click on the Additional Namespace Imports tab, and add the following namespaces as per the screenshot below.
    • Microsoft.ComplexEventProcessing
    • Microsoft.ComplexEventProcessing.Linq

image_thumb12

  • Since this won’t be the only time (I hope Smile) that we use this technique, go ahead and click on the Set as default for new queries button, click OK to acknowledge the popup, then click OK to close the Query Properties dialog.

  • Now we can go ahead and define an embedded StreamInsight server to process our queries, by pasting the code below into the LINQPad query window.

  1. using (Server server = Server.Create("Default"))
  2. {
  3.     Application app = server.CreateApplication("test");
  4.    
  5.    
  6. }
  • Using the techniques described in Colin’s excellent post on using Sequences with StreamInsight v1.1, let’s convert the set of orders into a temporal stream.  The code below demonstrates how to obtain a CepStream<> of intervals from the enumerable set of orders.

  1. // Convert the set of orders into an interval stream
  2. var orderStream = orders.ToIntervalStream(app, t =>
  3.     IntervalEvent.CreateInsert((DateTimeOffset)t.OrderDate, (DateTimeOffset)t.ShippedDate, t),
  4.     AdvanceTimeSettings.IncreasingStartTime);
  • Now we’re ready to write our basic query.  Let’s look for the condition wherein we have more than one outstanding order in a region (i.e. if the order date and ship dates overlap).

  1. // Group the order stream by region, and create
  2. // snapshot windows to look at the sets of overlapping
  3. // orders. Count the number of orders.   
  4. var regionCounts = from o in orderStream
  5.                    group o by o.Region into regionGroups
  6.                    from win in regionGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  7.                    select new { ShipRegion = regionGroups.Key, Count = win.Count() };
  8.  
  9. // Look for regions which have more than 1 overlapping
  10. // order
  11. var filter = from e in regionCounts
  12.              where e.Count > 2
  13.              select e;
  14.  
  15. // Dump the results of the query out into an enumerable
  16. var snk = from i in filter.ToIntervalEnumerable()
  17.           where i.EventKind == EventKind.Insert
  18.           select new { i.StartTime, i.EndTime, i.Payload.ShipRegion, i.Payload.Count };
  19. snk.Dump();
  • Let’s run the query (by pressing F5) and observe the results.

IEnumerable<> (130 items)
StartTime EndTime ShipRegion Count
7/8/1996 7:00:00 AM +00:00 7/9/1996 7:00:00 AM +00:00 null 3
7/9/1996 7:00:00 AM +00:00 7/10/1996 7:00:00 AM +00:00 null 4
7/10/1996 7:00:00 AM +00:00 7/11/1996 7:00:00 AM +00:00 null 3
7/11/1996 7:00:00 AM +00:00 7/12/1996 7:00:00 AM +00:00 null 3
7/12/1996 7:00:00 AM +00:00 7/15/1996 7:00:00 AM +00:00 null 4

Interesting result combining LINQPad exploration of a LINQ to WCF OData and StreamInsight – but what went into producing that result? Which collection of events resulted in that output event?  Answering that question will be my next post on using the Event Flow Debugger with LINQPad and StreamInsight Smile

  1. // Retrieve orders from the Northwind OData feed that
  2. // have valid shipping dates and order dates. Select
  3. // fields to be used in the StreamInsight query (basic
  4. // types only)
  5. var orders = from o in Orders
  6. where o.ShippedDate != null && o.OrderDate != null
  7. orderby o.OrderDate ascending
  8. select new
  9. {
  10.     CustomerName = o.Customer.CompanyName,
  11.     OrderDate = (DateTime)o.OrderDate,
  12.     ShippedDate = (DateTime)o.ShippedDate,
  13.     Region = o.ShipRegion
  14. };
  15.  
  16. using (Server server = Server.Create("Default"))
  17. {
  18.     Application app = server.CreateApplication("test");
  19.  
  20.     // Convert the set of orders into an interval stream
  21.     var orderStream = orders.ToIntervalStream(app, t =>
  22.       IntervalEvent.CreateInsert((DateTimeOffset)t.OrderDate, (DateTimeOffset)t.ShippedDate, t),
  23.       AdvanceTimeSettings.IncreasingStartTime);
  24.    
  25.     // Group the order stream by region, and create
  26.     // snapshot windows to look at the sets of overlapping
  27.     // orders. Count the number of orders.   
  28.     var regionCounts = from o in orderStream
  29.         group o by o.Region into regionGroups
  30.         from win in regionGroups.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
  31.         select new { ShipRegion = regionGroups.Key, Count = win.Count() };
  32.  
  33.     // Look for regions which have more than 1 overlapping
  34.     // order
  35.     var filter = from e in regionCounts
  36.         where e.Count > 2
  37.         select e;
  38.  
  39.     // Dump the results of the query out into an enumerable
  40.     var snk = from i in filter.ToIntervalEnumerable()
  41.           where i.EventKind == EventKind.Insert
  42.           select new { i.StartTime, i.EndTime, i.Payload.ShipRegion, i.Payload.Count };
  43.     snk.Dump();
  44. }