Share via


TopK

 

The examples in this topic demonstrate using the TopK operation to rank events in a window stream and return the top N events based on an ascending or descending order of ranking. TopK is specified through the Take() extension method, which expects a stream of type IQOrderedWindow<T>. This type is produced as soon as an orderby clause is specified. The orderby clause must operate on events within an IQWindowedStreamable<T> stream. Hence, a snapshot or hopping (tumbling) window operator must be applied on the input stream.

Note


For CEPStream<T> streams, the Take() extension method expects a stream of type CepOrderedStream<T>. The discussion in this topic applies to both IQStreamable and CEPStream streams.

In case of ties, TopK will return more than N events, hence it is always behaves deterministically.

TopK does not work with count-based windows.

TopK is implemented as a time-insensitive operator. Hence the lifetimes of the output events will be set according to the window’s output policy.

Examples

The following example takes the top five events from each snapshot window defined for the input stream inputStream and generates a new event stream. The events in each window are ordered in the ascending order of values in the payload fields e.f combined with the descending order of values in the payload field e.i.

// Assuming the following input event type for inputStream:   
public class MyPayload   
{  
  public int f;   
  public int i;   
}  
  
// IQStreamable example:  
var topfive = (from window in inputStream.Snapshot()  
               from b in  
                   (from e in window  
                    orderby e.f ascending, e.i descending  
                    select e).Take(5)  
               select b;  
  
// CEPStream example:  
var topfive = (from window in inputStream.Snapshot()  
               from e in window  
               orderby e.f ascending, e.i descending  
               select e).Take(5);  

You can use a projection in a Take clause, as shown in the following example.

  
// IQStreamable example:  
var result = (from win in source  
                       .TumblingWindow(TimeSpan.FromMinutes(10))  
                   from b in   
                       (from r in win  
                        orderby r.Value descending  
                        select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new  
                                              {  
                                                  ranking = e.Rank,  
                                                  e.Payload. scaled,  
                                                  e.Payload.SourceId  
                                              })  
                   select b;  
  
// CEPStream example:  
var result = (from win in source  
                       .TumblingWindow(TimeSpan.FromMinutes(10), HoppingWindowOutputPolicy.ClipToWindowEnd)  
                   from r in win  
                   orderby r.Value descending  
                   select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new  
                                         {  
                                             ranking = e.Rank,  
                                             e.Payload. scaled,  
                                             e.Payload.SourceId  
                                         });  

If the actual ranking result needs to be projected into the events' payload, a corresponding lambda expression is used. The rank is then accessed through the property Rank, while the payload fields are accessible through the property Payload.

  
// IQStreamable example:  
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2))  
               from b in  
                   (from e in window  
                    orderby e.f ascending, e.i descending  
                    select e).Take(3, e => new  
                    {  
                        ranking = e.Rank,  
                        f = e.Payload.f,  
                        i = e.Payload.i  
                    })  
               select b;  
  
  
// CEPStream example:  
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2), WindowOutputPolicy.ClipToWindowEnd)  
               from e in window  
               orderby e.f ascending, e.i descending  
               select e).Take(3, e => new  
               {  
                   ranking = e.Rank,  
                   f = e.Payload.f,  
                   i = e.Payload.i  
               });  
  

The following illustration shows the temporal property of a TopK result. This example uses a hopping window with TopK, such that the two events with the highest payload values for the payload field val are picked from each window.

  
// IQStreamable example:  
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10))  
               from b in  
                   (from e in window  
                    orderby e.val descending  
                    select e).Take(2)  
               select b;  
  
  
// CEPStream example:  
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10), WindowOutputPolicy.ClipToWindowEnd)  
               from e in window  
               orderby e.val descending  
               select e).Take(2);  
  

The orange boxes represent the windows. The illustration shows how TopK sets the lifetimes of the selected payloads to the window size. Here, we assume point events in the input stream, displaying the value of their payload field val. Note that the specified output policy of ClipToWindowEnd assigns the window time span to the result events.

TopK

It is important to understand the behavior of TopK for ties. Assume the following example, where the input stream contains duplicate values and the top 2 values over a tumbling window are computed. In this example, the default output policy of PointAlignToWindowEnd is used:

TopK operator with PointAlignToWindowEnd policy

The top 2 values in the first window are 3 and 2. All events where the ranked field has one of these values will be included in the result, which contains three events in this example.

See Also

NIB StreamInsight Server Concepts
Using Event Windows
Hopping Windows
Snapshot Windows