TopK

本主题中的示例演示如何使用 TopK 操作对窗口流中的事件进行排名,并且基于排名的升序或降序返回前 N 个事件。TopK 通过 Take() 扩展方法进行指定,该方法需要 CepOrderedStream<T> 类型的流。此类型在指定 orderby 子句后将立即生成。orderby 子句必须对 CepWindowStream<T> 流内的事件进行操作。因此,必须对输入流应用快照或跳跃(翻转)窗口运算符。

在具有重复值的情况下,TopK 将返回超过 N 个事件,因此在行为上始终是确定性的。

TopK 不使用基于计数的窗口。

TopK 作为对时间不敏感的运算符实现。因此,输出事件的生存期将根据窗口的输出策略进行设置。

示例

下面的示例从为输入流 inputStream 定义的每个快照窗口获取排名最高的五个事件,并生成一个新事件流。每个窗口中的事件按负载字段 e.f 中值的升序以及负载字段 e.i 中值的降序进行排序。

// Assuming the following input event type for inputStream: 
public class MyPayload 
{
  public int f; 
  public int i; 
}

var topfive = (from window in inputStream.Snapshot()
               from e in window
               orderby e.f ascending, e.i descending
               select e).Take(5);

您可以在 Take 子句中使用投影,如下例所示。

var result = (from win in source
                       .TumblingWindow(TimeSpan.FromMinutes(10), HoppingWindowOutputPolicy.ClipToWindowEnd)
                   from r in win
                   orderby r.Value descending
                   select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new
                                         {
                                             ranking = e.Rank,
                                             e.Payload. scaled,
                                             e.Payload.SourceId
                                         });

如果需要将实际排名结果提取到事件的负载中,将使用相应的 lambda 表达式。随后可通过 Rank 属性访问排名,同时可通过 Payload 属性访问负载字段。

var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2), WindowOutputPolicy.ClipToWindowEnd)
               from e in window
               orderby e.f ascending, e.i descending
               select e).Take(3, e => new
               {
                   ranking = e.Rank,
                   f = e.Payload.f,
                   i = e.Payload.i
               });

下图显示了 TopK 结果的临时属性。该示例使用具有 TopK 的跳跃窗口,这样,将从每个窗口中选出其负载字段 val 具有最高负载值的两个事件。

var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10), WindowOutputPolicy.ClipToWindowEnd)
               from e in window
               orderby e.val descending
               select e).Take(2);

橙色框表示这些窗口。该图显示 TopK 是如何将所选负载的生存期设置为窗口大小的。在图中,我们在输入流中采用点事件,并且显示其负载字段 val 的值。请注意,ClipToWindowEnd 的指定输出策略将窗口时间范围分配给结果事件。

显示 TopK 结果的临时属性。

理解对于重复值的 TopK 的行为是十分重要的。假定下面这个例子,其中,输入流包含重复值,并且计算针对翻转窗口的前 2 个值。在这个例子中,使用 PointAlignToWindowEnd 的默认输出策略:

带有 PointAlignToWindowEnd 策略的 TopK 运算符

第一个窗口中的前 2 个值是 3 和 2。其排名字段具有这些值之一的所有事件都将包含在结果中,在此示例中将包含三个事件。

请参阅

概念

StreamInsight 服务器概念

使用事件窗口

跳跃窗口

快照窗口