StreamInsight Checkpoints: What, How and Why?
I’ve been fielding some questions this week on checkpoints in StreamInsight. I’ll share my way of thinking about checkpoints in the hopes that it will help others build applications leveraging the feature. First, I’ll define what a StreamInsight checkpoint represents. Then I’ll explain how to interpret the high-water mark information StreamInsight provides to input and output adapters when a query is resumed after StreamInsight downtime (planned or unplanned). There’s an important piece of trivia here that may surprise even veteran users!
The MSDN documentation outlines three levels of resiliency. In this post I will focus on the strictest resiliency level, but the concepts outlined are relevant for all three.
Think of a StreamInsight query as a black box. It takes a one or more sequences of events as inputs. It produces a sequence of events as output (in the upcoming 2.1 StreamInsight release, multiple output sequences are supported as well) *. It probably comes as no surprise that the black box contains some state. If your query is computing averages over windows, incoming events will contribute to some number of windows. Until the average over a particular window can be committed to the output, sums and counts need to be maintained internally. When you checkpoint a query, you’re really just saving the internal state of the query. It’s that simple!
Well, almost. In addition to the query state, a checkpoint captures the position of input and output sequences as of the checkpoint**. For example, if a checkpoint could speak it might say “between enqueuing input events x2 and x3, and between dequeing output events y2 and y3, this was the state of the query”.
After a StreamInsight server instance has been stopped, a query can be resumed using the checkpoint state. Input and output adapters need to do some work as well. After the checkpoint was taken (but before the server instance was stopped), the input sequence may have progressed (let’s say x3 was enqueued) and the output sequence may have progressed as well (let’s say y3 was dequeued). Ideally, the input adapter would then replay x3 and subsequent events. And the ideal output adapter would forget that it had ever seen y3 or anything after it.
Instead of forgetting – by, say, deleting rows from a table or removing lines from a log file – the output adapter may instead choose to suppress output events it knows have been emitted already. The latter approach is relatively difficult to get right however:
- The output may include multiple events with the same timestamp(s) and payload value. Consider the case where the checkpoint is taken between two identical events with same timestamp. A naïve de-duplication implementation may incorrectly suppress the second event because the first (equivalent) event has been emitted already.
- While StreamInsight deterministically produces logically consistent results, there is no guarantee about the specific sequence of output events between runs. The number of CTIs may vary. The order of events with the same timestamp may also vary between runs (or even the order of events with different timestamps when using StreamEventOrder.ChainOrdered).
- The output adapter needs to reliably determine which events have already been emitted.
In any case, the rule of thumb when resuming a query:
Input adapters must replay events after the checkpoint; output adapters must forget events after the checkpoint.
StreamInsight provides a high-water mark (HWM, pronounced huh-wim I think) value to input adapters that can be used to determine where in the input sequence a checkpoint occurred (see IHighWaterMarkInputAdapterFactory and IHighWaterMarkTypedInputAdapterFactory). Output adapters get both an HWM value and an offset (see IHighWaterMarkOutputAdapterFactory and IHighWaterMarkTypedOutputAdapterFactory). How can these values be used to identify an element of a sequence? First, let’s define HWM. An event xi has high-water mark value h if its timestamp is h and for all j < i, xj has a lower timestamp. Whenever an event in a sequence has a higher timestamp than any preceding event, it has a HWM value. Assuming that your input sequence is conveniently described by an IEnumerable<PointEvent<T>> sequence xs, you can find the event corresponding to an hwm as follows:
var inputCheckpointEvent = xs.FirstOrDefault(x => x.StartTime == hwm)
Assuming that the output from the query is captured in a (resilient and persistent) List<PointEvent<T>> , and given hwm and offset values for an output sequence, you just need to pop ahead offset positions to find the checkpoint event:
var outputCheckpointEvent = ys
.Where(x => x.StartTime >= hwm)
.Skip(offset).FirstOrDefault()
I promised a piece of trivia. While I’ve shown you how to find the “checkpoint event”, I haven’t told you where the checkpoint occurred: did it occur before the checkpoint event, or after? It turns out that the answer is before for an input checkpoint event but after for an output checkpoint event. Returning to the earlier example, if the checkpoint occurred between input events x2 and x3, the checkpoint event will be x3, but if the checkpoint occurred between output events y2 and y3, the checkpoint event will be y2! Again using the simple IEnumerable<> contract, I can demonstrate how replay works:
// replay all events after and including the checkpoint event
var replayXs = hwm.HasValue
? xs.SkipWhile(x => x.StartTime < hwm.Value)
: xs;
When forgetting output events and again assuming (somewhat unrealistically) that a List<> is the external reliable authority on which events have been emitted:
// remove all outputs events after but not including the checkpoint event
if (hwm.HasValue)
{
int hwmEvent = ys
.Select((y, i) => new { y.StartTime, Index = i })
.FirstOrDefault(y => y.StartTime == hwm.Value);
if (hwmEvent != null)
{
int forgetIndex = hwmEvent.Index + offset + 1;
if (forgetIndex < ys.Count)
{
ys.RemoveRange(forgetIndex, ys.Count – forgetIndex);
}
}
}
Let’s tie all of this information together with a simple walkthrough. In this example, the query state is described by the input events contributing to the state.
Action |
Query state |
Output list |
Notes |
Enqueue x1 |
{ x1 } |
{ y1 } |
|
Begin checkpoint |
{ x1 } |
{ y1 } |
Waiting for HWM event on input |
Enqueue x2 |
{ x1, x2 } |
{ y1, y2 } |
Let’s assume x2 does not increase the HWM, so the checkpoint cannot be initiated yet |
Enqueue x3 |
{ x1, x2 } |
{ y1, y2 } |
x3 is the input checkpoint event, current query state is captured before processing x3 |
(continued) |
{ x1, x2, x3 } |
{ y1, y2, y3 } |
y2 is the output checkpoint event, as it is the last event before the checkpoint |
End checkpoint |
{ x1, x2, x3 } |
{ y1, y2, y3 } |
Checkpoint <{ x1, x2 }, x3, y2> committed to disk |
Enqueue x4 |
{ x1, x2, x3, x4 } |
{ y1, y2, y3, y4 } |
|
Downtime! |
|||
Resume |
{ x1, x2 } |
{ y1, y2, |
StreamInsight recovers query state, output adapter forgets all output events after y2 |
Enqueue x3 |
{ x1, x2, x3 } |
{ y1, y2, y3 } |
Input replays from x3 |
Enqueue x4 |
{ x1, x2, x3, x4 } |
{ y1, y2, y3, y4 } |
|
… |
Notice that by replaying input events and forgetting output events, we see correct output after resuming the query!
* Notice that I’m talking about “sequences of events” rather than “temporal streams”. While StreamInsight operators are defined in terms of temporal streams – sequences of timestamped events with ordering constraints imposed by common time increments (or CTIs) – a checkpoint can be understood relative to generic sequences.
** StreamInsight doesn’t actually freeze the query operators, inputs and outputs in order to take a snapshot of the internal state, but this is a useful and accurate way of characterizing the logical contract for checkpoints. Details of the internal mechanism aren’t covered in this blog post.