StreamInsight and reference data (lists, databases, etc)
Using reference data in StreamInsight is a very common scenarios; some examples would be:
- Monitoring process control event streams for values that exceed a given threshold (for example, a valve’s pressure exceeding a certain safety threshold).
- Enriching utility usage information in a smart metering scenario with user and geography information (for example, to allow visualization of power usage by geography).
This reference data is commonly held in some form of repository, such as SQL Server, a process historian’s metadata store (e.g. OSIsoft’s PI Asset Framework), which then needs to be integrated into a StreamInsight query.
In my previous post, I showed how to use the TimeImport capability on streams to synchronize a slow moving reference stream with a fast moving data stream. I did not, however, walk through the process of creating a reference stream from a relatively static source (such as a database table or file). I intend to now correct that minor, and somewhat deliberate oversight
The project referenced in this blog post, along with the source data files is available here.
Reference data can be integrated into StreamInsight in a couple of different ways, each with their own pros and cons:
Technique | Pros | Cons |
|
|
|
|
|
|
Let’s look at an example of each technique, step by step.
The Scenario
For this illustration, let’s consider the following scenario:
- Stream of data events, each containing a user ID, an activity code and a status (user 7, logged in, success).
- Set of user metadata, contained in a file (for the purposes of illustration – a database would be a more likely source). This metadata contains information such as user ID, user name, etc.
- Want to annotate the data events with the user metadata.
We’ll use the following data sets:
Data Events
Start Time | End Time | User Id | Activity Code | Status |
6/25/2009 0:00:00 |
1 | Logon | Success | |
6/25/2009 0:00:01 |
2 | Logon | Fail | |
6/25/2009 0:00:05 |
1 | Browse | Success | |
6/25/2009 0:00:06 |
2 | Logon | Success |
Or, in the .csv format that we’ll use:
|
Reference Events
Start Time | End Time | User Id | User Name | Location |
6/25/2009 0:00:00 |
1 | Fred Jones | Seattle | |
6/25/2009 0:00:01 |
2 | Bob Murphy | Portland |
Or, in the .csv format that we’ll use:
|
User-Defined Lookup Function
With a user defined lookup function, we need to load the list of metadata into an associative array (i.e. a hashtable), and create a lookup function that allows us to grab user metadata from the dictionary on demand.
Note: Why did I use the hard coded dictionary rather than reading from a file? I wanted to illustrate the technique of a truly static data source. If you want to see the code that parses a .csv file into a dictionary object suitable for using in a lookup function, this is covered in the appendix section of the blog).
Note: Why didn’t I simply join the dictionary with the data stream? StreamInsight needs to join streams both relationally and temporally. Since a dictionary isn’t a stream (and has no concept of time, it’s not possible to directly join. Converting a reference source into a stream and joining is the other technique covered in the next section.
Code Snippet
- static Query CreateUserDefinedFunction(Application cepApp)
- {
- var dataStream = CepStream<DataEvent>.Create("dataStream",
- typeof(TextFileReaderFactory), new TextFileReaderConfig()
- {
- InputFileName = "dataEvents.csv",
- CtiFrequency = 1,
- CultureName = CultureInfo.InvariantCulture.Name,
- Delimiter = ','
- }, EventShape.Point);
- // Populate the reference data statically
- refData.Add(1, new ReferenceData()
- { UserId = 1, UserName = "Fred Jones", Location = "Seattle" } );
- refData.Add(2, new ReferenceData()
- { UserId = 2, UserName = "Bob Murphy", Location = "Portland" } );
- // Create a stream with the user name added to the query
- var joinedStream = from e in dataStream
- select new
- {
- UserId = e.UserId,
- Activity = e.ActivityCode,
- Status = e.Status,
- // Use a static lookup function to retrieve the name
- Name = LookupName(e.UserId),
- // Use a static lookup function to retrieve the location
- Location = LookupLocation(e.UserId)
- };
- // Attach a trace output adapter to the stream
- var query = joinedStream.ToQuery(cepApp, "outputLookup", "",
- typeof(TracerFactory), new TracerConfig()
- {
- DisplayCtiEvents = false,
- SingleLine = false,
- TraceName = "REF",
- TracerKind = TracerKind.Console
- }, EventShape.Point, StreamEventOrder.FullyOrdered);
- return query;
- }
- public static string LookupName(int userId)
- {
- if (refData.ContainsKey(userId))
- return refData[userId].UserName;
- else
- return "Unknown";
- }
- public static string LookupLocation(int userId)
- {
- if (refData.ContainsKey(userId))
- return refData[userId].Location;
- else
- return "Unknown";
- }
Upon running this function with the supplied data files, we observe the merged output:
|
Reference Data Stream (static data)
Using the more robust reference data stream approach, we:
- Use a TextFileInputFactory to import the contents of the text file
- Convert the series of point events into “infinite” signals.
- Join the reference stream with the data stream.
Note: this addresses conditions where the reference data DOES NOT CHANGE. This is not a very real-world scenario – we’ll cover the additions required to incorporate changing data in the next blog post, showing how to pull data from SQL Server.
Note: I don’t use the CTI import technique in this example. This is due to the text file input adapter sending an “infinite time” CTI when it reaches the end of the file. If we were using a continuous or changing reference data source (such as SQL Server), I’d need to employ that technique.
Code Snippet
- static Query CreateThresholdStream(Application cepApp)
- {
- // Pull the data events from the dataEvents.csv file
- var dataStream = CepStream<DataEvent>.Create("dataStream",
- typeof(TextFileReaderFactory), new TextFileReaderConfig()
- {
- InputFileName = "dataEvents.csv",
- CtiFrequency = 1,
- CultureName = CultureInfo.InvariantCulture.Name,
- Delimiter = ','
- }, EventShape.Point);
- // Create a reference stream using the datastream as a time reference
- var refStream = CepStream<ReferenceData>.Create("refStream",
- typeof(TextFileReaderFactory), new TextFileReaderConfig()
- {
- InputFileName = "refEvents.csv",
- CtiFrequency = 1,
- CultureName = CultureInfo.InvariantCulture.Name,
- Delimiter = ','
- }, EventShape.Point);
- // Stretch the ref stream points events out to infinity (i.e. must have
- // relational and temporal matches to join, and for this example, the data
- // doesn't change.
- var referenceEvents = from e in refStream.AlterEventDuration(
- e => TimeSpan.MaxValue)
- select e;
- // Join the two streams
- var joinedStream = from e1 in dataStream
- join e2 in referenceEvents
- on e1.UserId equals e2.UserId
- select new
- {
- UserId = e1.UserId,
- Activity = e1.ActivityCode,
- Status = e1.Status,
- // Use the reference stream to retrieve the name
- Name = e2.UserName,
- // Use the reference stream to retrieve the location
- Location = e2.Location
- };
- var query = joinedStream.ToQuery(cepApp, "outputStream", "",
- typeof(TracerFactory), new TracerConfig()
- {
- DisplayCtiEvents = false,
- SingleLine = false,
- TraceName = "STREAM",
- TracerKind = TracerKind.Console
- }, EventShape.Point, StreamEventOrder.FullyOrdered);
- return query;
- }
Upon running this query with the supplied data sets, we observe:
|
Comments
- Anonymous
October 19, 2010
Helps a lot..Thanks Mark. - Anonymous
June 01, 2011
Hi,In the first note after Reference Data Stream (static data) you say: 'This is not a very real-world scenario – we’ll cover the additions required to incorporate changing data in the next blog post, showing how to pull data from SQL Server.'.Has this blog ever been posted? If so, can you provide a link to it, if not, when will it be posted?Regards,René