Introducing Checkpointing in StreamInsight 1.2
Hi Folks,
As I mentioned some a few posts ago, I recently made a move to the StreamInsight team. Well, we’ve just released the new version, StreamInsight 1.2, so it’s time to start talking about what we’ve been up to.
One of the big features in the new version is resiliency, which helps you build highly available systems that use StreamInsight. This post gives a high-level, no-code overview of what this functionality does and how it can be used. More detailed posts will follow.
To motivate the feature, consider a few use cases:
- You have a query that consumes sensor data—say the temperature of a machine—and computes the average temperature over the last day. It’s not the end of the world if StreamInsight were to miss a few minutes of data: the missed events will likely have little impact on the overall average. But if these missed events were caused by an outage, you do care (a) that your app comes back up quickly, (b) that you don’t lose the substantial state that was built up for the aggregation, and (c) that the results you get when SI comes back up are close to what you would have gotten had SI not gone down.
- You have a query that consumes Twitter data and sends you email every time someone mentions Justin Bieber. For this query, you have the same requirement (a) for speedy recovery as above, but (b) and (c) are no longer sufficient: you really don’t want to miss an important alert should StreamInsight go down. On the other hand, you don't mind getting duplicate alerts. I.e., you want the same events you would have gotten without the outage—perhaps with some duplicates.
- You have a query that consumes stock price data, and produces trade events based on some secret algorithm. Now you have all of the requirements from the Bieber-bot—fast recovery, equivalent events—but now it could be catastrophic if you got duplicate trades. I.e., you want completely equivalent outputs whether or not there was an outage.
The form of resiliency introduced in StreamInsight 1.2 allows SI to take and restore checkpoints. A checkpoint is a serialized form of all of the internal state SI keeps for a query. When SI restarts, it will automatically restart a previously-running query and restore its state from the checkpoint, effectively putting the query at a known point when the checkpoint was taken.
You can control when checkpoints are taken: take them more frequently and you’ll lose less in a crash; take them less frequently and you’ll keep the impact on the system low. Let’s consider how this would be used in our first use case:
Your query is happily chugging along, and you’re asking the system to take a checkpoint every five minutes. At some time—say 12:59—some doofus trips on the power cord of the machine and brings it down. Realizing what he’s done, he plugs it back in and restarts everything. At 1:05 everything’s ready and he restarts StreamInsight, which reads back the latest checkpoint from 12:55 and is ready to start accepting events again at 1:07.
In this case, SI was down from 12:59 to 1:07, and all events that should have been received during this time were lost. In addition, all events received between 12:55 and 12:59 were effectively lost as well, since they weren’t recorded in the last checkpoint before the system went down. These lost input events may cause missed output events, and can also affect aggregate results that overlap the outage. Still, you’re happy because the system is back up, you’ve only lost 12 minutes of data, and your long-term aggregates are roughly intact.
Perhaps all of this is fine for a silly temperature reading, but what if someone were to tweet a Bieber sighting during the outage? To avoid missing this critical information, StreamInsight needs some help: something needs to keep track of the events that occurred during the outage, as well as those that occurred before the outage but after the last checkpoint, and be ready to send them along when SI comes back up.
That something is the input adapter—or more properly, whatever the input adapter is connected to. In other words, there has to be a component sitting in front of StreamInsight that keeps track of recent events and can replay them for SI. And this component has to be independent and highly available as well.
If we have this in place, we’re ready to tackle our second use case:
Your query is happily Biebering along, taking checkpoints every minute, but you have a historian sitting in front of your query. This historian is pulling tweets from Twitter and persisting them before sending them on to StreamInsight. As before, our doofus trips over the cable at 12:59 and restarts StreamInsight at 1:05. But now as part of its recovery routine, SI will ask its input adapter to replay the event it’s missed and the historian will guarantee that SI will see all that it has missed since 12:55 when it took its last checkpoint.
And this is great: StreamInsight came back, and you haven’t missed a thing. But you notice something funny: the report of Justin’s new coif that issued at 12:57 came through twice. Once before the outage, and once after. Why? Because SI forgot that it issued that event before the outage, and when the events replayed, it got issued again.
And that’s not bad. Unless events have consequences—which is certainly the case with stock trades. To eliminate these duplicates, StreamInsight needs a little more help, this time on the output. As SI produces events, the output adapter—again, more properly, whatever the output adapter is connected to—has to remember them.
When StreamInsight comes back up after an outage, it will essentially tell its output adapters the last time it remembers. It also guarantees that every event that it produced after that time will be produced again. Since the output adapter knows what these events were, it can remove the duplicates when it sees them instead of acting on them. And this is what we need in our third use case:
Your query is happily trading away, taking checkpoints every minute. You also have a historian in front of the query recoding stock prices, and a historian on the end recording trades. As before our doofus—who we have somehow failed to fire—trips over the cable at 12:59 and restarts things at 1:05. During recovery, StreamInsight gets the input to replay all stock info since 12:55, so no input is missed. It also notifies its output that all trades that happened between 12:55 and the outage will be sent again, so when the duplicate trades are issued, the output adapter just drops them on the floor.
And now we have full stream equivalence. You don’t miss a trade, and you don’t get any duplicates.
Hopefully this gives a feel of what’s possible with checkpointing. I’ll have more to say on specifics over the next few weeks. In the meantime, be sure to pick up the release and take a look at the documentation. And there’s a full end-to-end demo available on CodePlex as well.
Cheers,
-Isaac