群組及套用
本主題的範例示範如何使用 LINQ 的「群組依據」功能,將事件分割成事件群組。您可以在事件群組上執行彙總和其他作業,好讓每一個群組得以個別計算。套用到每一個群組的作業集合稱為「套用分支」(Apply Branch)。套用分支可以在單一群組和套用陳述式內以隱含方式提供,如果它包含更複雜的子查詢則可以當做個別的 LINQ 陳述式來提供。請注意,套用分支會在群組和套用建構內關閉,例如,群組的資料流無法加入來自群組外部的資料流。
範例
下列範例會依據指定的 modulo 函數來群組事件。然後它會將快照集視窗套用到每一個群組,然後個別計算每一個群組上裝載資料行的平均值。因此,套用分支是由此視窗和彙總所組成。
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int i;
public float j;
}
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.j) };
上一個範例產生具有單一裝載欄位的資料流,其中在每一個群組的每一個快照集視窗內包含 j 欄位的平均值。
您也可以在「群組依據」子句中,群組原始類型的投影,如以下範例所示。
var result = from e in source.AlterEventDuration(e => TimeSpan.FromMinutes(10))
group new { myVal = e.Value * 10 } by e.SourceId into g
from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
avg = win.Avg(e => e.myVal)
};
通常也應該保留群組索引鍵,好讓彙總結果可以與各自群組產生關聯。下一個範例示範如何擷取群組索引鍵。
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.number), groupId = eachGroup.Key };
您可以根據幾個索引鍵來群組,好讓輸入資料流中每一個唯一的索引鍵組合都會產生不同的群組。在此案例中,群組索引鍵必須包含在匿名型別定義中,以便可以在最終投射中明確擷取這些索引鍵。請注意,所有的群組欄位都必須參考。下列範例會依據兩個事件裝載欄位來群組事件,並將新的索引鍵名稱指派給其中一個事件。
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var avgCount = from v in inputStream
group v by new { sec = v.section, v.category } into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.value), section = eachGroup.Key.sec, category = eachGroup.Key.category };
套用分支可能更複雜,而且包含一連串的作業,如以下範例所示。
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var result = from s in source
group s by s.section into sg
from e in
(from e in sg
group e by e.category into cg
from win in cg.TumblingWindow(TimeSpan.FromMinutes(5), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new { cat = cg.Key, top = win.Max(e => e.value) })
select new { sec = sg.Key, e.cat, e.top };
在以下的範例中,系統會假設功率計讀取量的資料流,其中包含數個計量器的資料。此範例會相同計量器最後 10 分鐘平均值的每個讀數加上註解。查詢會先依計量器識別碼群組內送的資料。在每個這種群組中,會計算 10 分鐘的平均值,並加入至原始的計量器事件。
// Assuming the following input event type for sensorStream:
public class MeterReading
{
public string meterId;
public float usage;
}
var resultB = from s in sensorStream
group s by s.meterId into g
from e in
(from left in g
from right in
(from win in g
.AlterEventDuration(e => TimeSpan.FromMinutes(10))
.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avg = win.Avg(e => e.usage) })
select new { right.avg, left.usage })
select new { slidingAvg = e.avg, e.usage, g.Key };
如上述,表示套用分支的函數無法整合 applyIn 之外的其他任何內送資料流。