TopK
The examples in this topic demonstrate using the TopK operation to rank events in a window stream and return the top N events based on an ascending or descending order of ranking. TopK is specified through the Take() extension method, which expects a stream of type IQOrderedWindow<T>. This type is produced as soon as an orderby clause is specified. The orderby clause must operate on events within an IQWindowedStreamable<T> stream. Hence, a snapshot or hopping (tumbling) window operator must be applied on the input stream.
Note
For CEPStream<T> streams, the Take() extension method expects a stream of type CepOrderedStream<T>. The discussion in this topic applies to both IQStreamable and CEPStream streams.
In case of ties, TopK will return more than N events, hence it is always behaves deterministically.
TopK does not work with count-based windows.
TopK is implemented as a time-insensitive operator. Hence the lifetimes of the output events will be set according to the window’s output policy.
Examples
The following example takes the top five events from each snapshot window defined for the input stream inputStream
and generates a new event stream. The events in each window are ordered in the ascending order of values in the payload fields e.f
combined with the descending order of values in the payload field e.i
.
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int f;
public int i;
}
// IQStreamable example:
var topfive = (from window in inputStream.Snapshot()
from b in
(from e in window
orderby e.f ascending, e.i descending
select e).Take(5)
select b;
// CEPStream example:
var topfive = (from window in inputStream.Snapshot()
from e in window
orderby e.f ascending, e.i descending
select e).Take(5);
You can use a projection in a Take clause, as shown in the following example.
// IQStreamable example:
var result = (from win in source
.TumblingWindow(TimeSpan.FromMinutes(10))
from b in
(from r in win
orderby r.Value descending
select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new
{
ranking = e.Rank,
e.Payload. scaled,
e.Payload.SourceId
})
select b;
// CEPStream example:
var result = (from win in source
.TumblingWindow(TimeSpan.FromMinutes(10), HoppingWindowOutputPolicy.ClipToWindowEnd)
from r in win
orderby r.Value descending
select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new
{
ranking = e.Rank,
e.Payload. scaled,
e.Payload.SourceId
});
If the actual ranking result needs to be projected into the events' payload, a corresponding lambda expression is used. The rank is then accessed through the property Rank, while the payload fields are accessible through the property Payload.
// IQStreamable example:
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2))
from b in
(from e in window
orderby e.f ascending, e.i descending
select e).Take(3, e => new
{
ranking = e.Rank,
f = e.Payload.f,
i = e.Payload.i
})
select b;
// CEPStream example:
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2), WindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.f ascending, e.i descending
select e).Take(3, e => new
{
ranking = e.Rank,
f = e.Payload.f,
i = e.Payload.i
});
The following illustration shows the temporal property of a TopK result. This example uses a hopping window with TopK, such that the two events with the highest payload values for the payload field val
are picked from each window.
// IQStreamable example:
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10))
from b in
(from e in window
orderby e.val descending
select e).Take(2)
select b;
// CEPStream example:
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10), WindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.val descending
select e).Take(2);
The orange boxes represent the windows. The illustration shows how TopK sets the lifetimes of the selected payloads to the window size. Here, we assume point events in the input stream, displaying the value of their payload field val
. Note that the specified output policy of ClipToWindowEnd assigns the window time span to the result events.
It is important to understand the behavior of TopK for ties. Assume the following example, where the input stream contains duplicate values and the top 2 values over a tumbling window are computed. In this example, the default output policy of PointAlignToWindowEnd is used:
The top 2 values in the first window are 3 and 2. All events where the ranked field has one of these values will be included in the result, which contains three events in this example.
See Also
NIB StreamInsight Server Concepts
Using Event Windows
Hopping Windows
Snapshot Windows