分组和应用

此主题中的示例演示如何通过使用 LINQ“group by”功能,将事件划分到事件组中。可以对事件组执行聚合和其他操作,因此可以单独计算每个组。应用于每个组的操作集称为“应用分支”。可以在单个组和应用语句中隐式提供应用分支;或者,如果它包含一个较复杂的子查询,则可以作为单独的 LINQ 语句。请注意,应用分支在组和应用构造内结束;例如不可能将已分组的流与组外部的流联接在一起。

示例

以下示例通过指定的 modulo 函数来分组事件。然后,对每个组应用一个快照窗口,并分别计算每个组上负载列的平均值。这样,应用分支将由窗口和聚合构成。

// Assuming the following input event type for inputStream:
public class MyPayload 
{
    public int i; 
    public float j; 
}

var avgCount = from v in inputStream
               group v by v.i % 4 into eachGroup
               from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
               select new { avgNumber = window.Avg(e => e.j) };

上例生成带有单个负载字段的流,在每组的每个快照窗口和每个组内包含字段 j 的平均值。

您也可以将“group by”子句中原始类型的投影分组,如下例所示:

var result = from e in source.AlterEventDuration(e => TimeSpan.FromMinutes(10))
                  group new { myVal = e.Value * 10 } by e.SourceId into g
                  from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new
                  {
                      avg = win.Avg(e => e.myVal)
                   };

通常,应保留分组键,以便聚合结果可以与各自组关联。下例演示如何检索分组键。

var avgCount = from v in inputStream
               group v by v.i % 4 into eachGroup
               from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
               select new { avgNumber = window.Avg(e => e.number), groupId = eachGroup.Key };

可以在几个键上分组,以便输入流中的每个唯一键组合产生一个单独组。在这种情况下,必须在匿名类型定义中包含分组键,以便能够在最终投影中显式检索它们。注意,所有分组字段都必须被引用。以下示例按两个事件负载字段分组事件,并且将一个新的键名称分配给其中一个字段。

// Assuming the following input event type for inputStream:
public class MyPayload 
{
    public int section; 
    public string category; 
    public float value; 
}

var avgCount = from v in inputStream
               group v by new { sec = v.section, v.category } into eachGroup
               from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
               select new { avgNumber = window.Avg(e => e.value), section = eachGroup.Key.sec, category = eachGroup.Key.category };

应用分支可以更复杂,包含一系列操作,如下例所示:

// Assuming the following input event type for inputStream:
public class MyPayload 
{
    public int section; 
    public string category; 
    public float value; 
}

var result = from s in source
                group s by s.section into sg
                from e in
                    (from e in sg
                    group e by e.category into cg
                    from win in cg.TumblingWindow(TimeSpan.FromMinutes(5), HoppingWindowOutputPolicy.ClipToWindowEnd)
                    select new { cat = cg.Key, top = win.Max(e => e.value) })
                select new { sec = sg.Key, e.cat, e.top };

在以下示例中,假定存在电表读数的流,包含几个电表的数据。该示例使用同一个电表最后 10 分钟的平均值表示每个读数。查询首先按电表 ID 将传入的数据分组。在每个这样的组中,计算 10 分钟内的平均值并联接到原始电表事件。

// Assuming the following input event type for sensorStream:
public class MeterReading
{
    public string meterId; 
    public float usage; 
}

var resultB = from s in sensorStream
              group s by s.meterId into g
              from e in
                  (from left in g
                  from right in
                      (from win in g
                          .AlterEventDuration(e => TimeSpan.FromMinutes(10))
                          .SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                      select new { avg = win.Avg(e => e.usage) })
                  select new { right.avg, left.usage })
              select new { slidingAvg = e.avg, e.usage, g.Key };

如上所述,表示应用分支的函数不能集成除 applyIn 外的任何其他传入流。

请参阅

概念

StreamInsight 服务器概念

使用事件窗口

快照窗口