TopK
本主题中的示例演示如何使用 TopK 操作对窗口流中的事件进行排名,并且基于排名的升序或降序返回前 N 个事件。TopK 通过 Take() 扩展方法进行指定,该方法需要 CepOrderedStream<T> 类型的流。此类型在指定 orderby 子句后将立即生成。orderby 子句必须对 CepWindowStream<T> 流内的事件进行操作。因此,必须对输入流应用快照或跳跃(翻转)窗口运算符。
在具有重复值的情况下,TopK 将返回超过 N 个事件,因此在行为上始终是确定性的。
TopK 不使用基于计数的窗口。
TopK 作为对时间不敏感的运算符实现。因此,输出事件的生存期将根据窗口的输出策略进行设置。
示例
下面的示例从为输入流 inputStream 定义的每个快照窗口获取排名最高的五个事件,并生成一个新事件流。每个窗口中的事件按负载字段 e.f 中值的升序以及负载字段 e.i 中值的降序进行排序。
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int f;
public int i;
}
var topfive = (from window in inputStream.Snapshot()
from e in window
orderby e.f ascending, e.i descending
select e).Take(5);
您可以在 Take 子句中使用投影,如下例所示。
var result = (from win in source
.TumblingWindow(TimeSpan.FromMinutes(10), HoppingWindowOutputPolicy.ClipToWindowEnd)
from r in win
orderby r.Value descending
select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new
{
ranking = e.Rank,
e.Payload. scaled,
e.Payload.SourceId
});
如果需要将实际排名结果提取到事件的负载中,将使用相应的 lambda 表达式。随后可通过 Rank 属性访问排名,同时可通过 Payload 属性访问负载字段。
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2), WindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.f ascending, e.i descending
select e).Take(3, e => new
{
ranking = e.Rank,
f = e.Payload.f,
i = e.Payload.i
});
下图显示了 TopK 结果的临时属性。该示例使用具有 TopK 的跳跃窗口,这样,将从每个窗口中选出其负载字段 val 具有最高负载值的两个事件。
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10), WindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.val descending
select e).Take(2);
橙色框表示这些窗口。该图显示 TopK 是如何将所选负载的生存期设置为窗口大小的。在图中,我们在输入流中采用点事件,并且显示其负载字段 val 的值。请注意,ClipToWindowEnd 的指定输出策略将窗口时间范围分配给结果事件。
理解对于重复值的 TopK 的行为是十分重要的。假定下面这个例子,其中,输入流包含重复值,并且计算针对翻转窗口的前 2 个值。在这个例子中,使用 PointAlignToWindowEnd 的默认输出策略:
第一个窗口中的前 2 个值是 3 和 2。其排名字段具有这些值之一的所有事件都将包含在结果中,在此示例中将包含三个事件。