分组和应用
此主题中的示例演示如何通过使用 LINQ“group by”功能,将事件划分到事件组中。可以对事件组执行聚合和其他操作,因此可以单独计算每个组。应用于每个组的操作集称为“应用分支”。可以在单个组和应用语句中隐式提供应用分支;或者,如果它包含一个较复杂的子查询,则可以作为单独的 LINQ 语句。请注意,应用分支在组和应用构造内结束;例如不可能将已分组的流与组外部的流联接在一起。
示例
以下示例通过指定的 modulo 函数来分组事件。然后,对每个组应用一个快照窗口,并分别计算每个组上负载列的平均值。这样,应用分支将由窗口和聚合构成。
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int i;
public float j;
}
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.j) };
上例生成带有单个负载字段的流,在每组的每个快照窗口和每个组内包含字段 j 的平均值。
您也可以将“group by”子句中原始类型的投影分组,如下例所示:
var result = from e in source.AlterEventDuration(e => TimeSpan.FromMinutes(10))
group new { myVal = e.Value * 10 } by e.SourceId into g
from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
avg = win.Avg(e => e.myVal)
};
通常,应保留分组键,以便聚合结果可以与各自组关联。下例演示如何检索分组键。
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.number), groupId = eachGroup.Key };
可以在几个键上分组,以便输入流中的每个唯一键组合产生一个单独组。在这种情况下,必须在匿名类型定义中包含分组键,以便能够在最终投影中显式检索它们。注意,所有分组字段都必须被引用。以下示例按两个事件负载字段分组事件,并且将一个新的键名称分配给其中一个字段。
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var avgCount = from v in inputStream
group v by new { sec = v.section, v.category } into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.value), section = eachGroup.Key.sec, category = eachGroup.Key.category };
应用分支可以更复杂,包含一系列操作,如下例所示:
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var result = from s in source
group s by s.section into sg
from e in
(from e in sg
group e by e.category into cg
from win in cg.TumblingWindow(TimeSpan.FromMinutes(5), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new { cat = cg.Key, top = win.Max(e => e.value) })
select new { sec = sg.Key, e.cat, e.top };
在以下示例中,假定存在电表读数的流,包含几个电表的数据。该示例使用同一个电表最后 10 分钟的平均值表示每个读数。查询首先按电表 ID 将传入的数据分组。在每个这样的组中,计算 10 分钟内的平均值并联接到原始电表事件。
// Assuming the following input event type for sensorStream:
public class MeterReading
{
public string meterId;
public float usage;
}
var resultB = from s in sensorStream
group s by s.meterId into g
from e in
(from left in g
from right in
(from win in g
.AlterEventDuration(e => TimeSpan.FromMinutes(10))
.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avg = win.Avg(e => e.usage) })
select new { right.avg, left.usage })
select new { slidingAvg = e.avg, e.usage, g.Key };
如上所述,表示应用分支的函数不能集成除 applyIn 外的任何其他传入流。