Share via


Parameterized StreamInsight Queries

The changes in our APIs enable a set of scenarios that were either not possible before or could only be achieved through workarounds. One such use case that people ask about frequently is the ability to parameterize a query and instantiate it with different values instead of re-deploying the entire statement. I’ll demonstrate how to do this in StreamInsight 2.1 and combine it with a method of using subjects for dynamic query composition in a mini-series of (at least) two blog articles.

Let’s start with something really simple: I want to deploy a windowed aggregate to a StreamInsight server, and later use it with different window sizes. The LINQ statement for such an aggregate is very straightforward and familiar:

var result = from win in stream.TumblingWindow(TimeSpan.FromSeconds(5)) 
             select win.Avg(e => e.Value);

Obviously, we had to use an existing input stream object as well as a concrete TimeSpan value. If we want to be able to re-use this construct, we can define it as a IQStreamable:

var avg = myApp
    .DefineStreamable((IQStreamable<SourcePayload> s, TimeSpan w) =>
        from win in s.TumblingWindow(w)
        select win.Avg(e => e.Value));

The DefineStreamable API lets us define a function, in our case from a IQStreamable (the input stream) and a TimeSpan (the window length) to an IQStreamable (the result). We can then use it like a function, with the input stream and the window length as parameters:

var result = avg(stream, TimeSpan.FromSeconds(5));

Nice, but you might ask: what does this save me, except from writing my own extension method? Well, in addition to defining the IQStreamable function, you can actually deploy it to the server, to make it re-usable by another process! When we deploy an artifact in V2.1, we give it a name:

var avg = myApp
    .DefineStreamable((IQStreamable<SourcePayload> s, TimeSpan w) =>
        from win in s.TumblingWindow(w)
        select win.Avg(e => e.Value))
    .Deploy("AverageQuery");

When connected to the same server, we can now use that name to retrieve the IQStreamable and use it with our own parameters:

var averageQuery = myApp
    .GetStreamable<IQStreamable<SourcePayload>, TimeSpan, double>("AverageQuery");

var result = averageQuery(stream, TimeSpan.FromSeconds(5));

Convenient, isn’t it?

Keep in mind that, even though the function “AverageQuery” is deployed to the server, its logic will still be instantiated into each process when the process is created. The advantage here is being able to deploy that function, so another client who wants to use it doesn’t need to ask the author for the code or assembly, but just needs to know the name of deployed entity.

A few words on the function signature of GetStreamable: the last type parameter (here: double) is the payload type of the result, not the actual result stream’s type itself. The returned object is a function from IQStreamable<SourcePayload> and TimeSpan to IQStreamable<double> .

In the next article we will integrate this usage of IQStreamables with Subjects in StreamInsight, so stay tuned!

Regards,
The StreamInsight Team