Time in StreamInsight (II)
Part 2: Advancing Time in a Stream
The purpose of this post is to highlight some common cases where declaratively specifying settings for advancing time makes sense. You can also take a look at the product documentation: https://msdn.microsoft.com/en-us/library/ff518502.aspx. It provides some useful background for this discussion.
As you probably know by now, every StreamInsight computation requires a notification that time has advanced to produce results. That is done via a special event that we call CTI (Current Time Increment). The CTI event has a timestamp, and intuitively receiving such an event means that nothing can change before that timestamp, so we do not expect to see more events that occur in the past from the CTI timestamp.
In this article, we will focus on the specification of CTIs through the AdvanceTimeGenerationSettings interface, which can be used through the adapter factory (wrapped in AdapterAdvanceTimeSettings) as well as CepStream.Create() (wrapped in AdvanceTimeSettings).
Because a CTI makes some guarantees about timeliness, the system uses that property to generate query output based on the CTI’s timestamp. Intuitively, every event that occurs in the past from the CTI participates in the results, whereas events that occur in the future are not considered. This is a model that allows lots of flexibility and we can’t discuss all the implications here, but there are a few common cases that we want to highlight:
- Events with strictly increasing start times.
- Events with strictly increasing with duplicates start times.
- Events whose start times have bound disorder.
If we know that we have a monotonically increasing stream of events, that is events for which StartTimei < StartTimei+1, we can make use of that knowledge when specifying AdvanceTimeSettings for a stream. Since we know time will always move forward it makes no sense to wait and we expect to see every event immediately contributing to the output. Then we could use the settings as below, which effectively move the CTI one tick past the StartTime of the event.
- new AdvanceTimeSettings(
- new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1)),
- null,
- AdvanceTimePolicy.Adjust);
If we know that we can have duplicates but otherwise the start time of the event is increasing we can’t move the CTI past the current event, because duplicate events will be dropped or adjusted, depending on their event type and policy. In that case our best choice is to emit a CTI at the StartTime of each event. This avoid dropping duplicates, but the current event does not participate in the output until an event with a higher StartTime arrives.
- new AdvanceTimeSettings(
- new AdvanceTimeGenerationSettings(1, TimeSpan.Zero),
- null,
- AdvanceTimePolicy.Adjust);
One simple strategy for dealing with disorder in start time is to place an upper bound for how far back from the previous CTI we can have the start time of the future event. Suppose we know that for all events we have StartTimei+1 >= StartTimei+d (where d is a TimeSpan expression representing the delay) and we can drop the events that come with start time too far behind. Then we can choose:
- new AdvanceTimeSettings(
- new AdvanceTimeGenerationSettings(1, d),
- null,
- AdvanceTimePolicy.Drop);
Thanks,
Ciprian Gerea
Comments
Anonymous
January 04, 2012
CTI = Current Time IncrementAnonymous
January 05, 2012
Thanks, Don--corrected.