Группирование и применение
В примерах из этого раздела показано, как события секционируются по группам с помощью функций «group by» в языке LINQ. Статистическую обработку и другие операции можно выполнять с группами событий, чтобы каждая группа вычислялась отдельно. Набор операций, применяемых к каждой группе, называется ветвью применения. Ветвь применения можно задавать неявно в пределах одной группы и инструкции применения либо в виде отдельной инструкции LINQ, если она содержит более сложный вложенный запрос. Следует учитывать, что ветви применения являются замкнутыми в пределах конструкции группы и применения; например, невозможно соединить сгруппированный поток с потоком, выходящим за пределы группирования.
Примеры
В следующем примере события группируются по указанной функции modulo. Затем к каждой группе применяется окно моментального снимка и вычисляется среднее значение для столбца полезных данных отдельно в каждой группе. Таким образом, ветвь применения состоит из окна и агрегата.
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int i;
public float j;
}
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.j) };
В предыдущем примере был создан поток с одним полем полезных данных, которое содержит среднее значения для поля j в каждом окне моментального снимка и для каждой группы.
Можно также сгруппировать проекцию исходного типа в предложении «group by», как показано в следующем примере.
var result = from e in source.AlterEventDuration(e => TimeSpan.FromMinutes(10))
group new { myVal = e.Value * 10 } by e.SourceId into g
from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
avg = win.Avg(e => e.myVal)
};
Обычно следует сохранять ключ группирования, чтобы иметь возможность сопоставить результат статистической обработки с нужной группой. В следующем примере показано, как получить ключ группирования.
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.number), groupId = eachGroup.Key };
Возможно выполнять группирование по нескольким ключам, чтобы каждое уникальное сочетание клавиш во входном потоке отвечало отдельной группе. В этом случае ключи группирования должны содержаться в определении анонимного типа, чтобы их можно было явно получать в окончательной проекции. Учтите, что необходимо упомянуть все поля группирования. В следующем примере события группируются по двум полям полезных данных, и одному из них присваивается новое имя ключа.
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var avgCount = from v in inputStream
group v by new { sec = v.section, v.category } into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.value), section = eachGroup.Key.sec, category = eachGroup.Key.category };
Ветвь применения может быть более сложной, содержащей ряд операций, как показано в примере ниже.
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var result = from s in source
group s by s.section into sg
from e in
(from e in sg
group e by e.category into cg
from win in cg.TumblingWindow(TimeSpan.FromMinutes(5), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new { cat = cg.Key, top = win.Max(e => e.value) })
select new { sec = sg.Key, e.cat, e.top };
В следующем примере предполагается наличие потока показаний ваттметра, содержащего данные нескольких измерительных приборов. В этом примере каждое показание аннотируется средним значением для того же измерительного прибора за последние 10 минут. В запросе вначале группируются входящие данные по идентификаторам измерительных приборов. В каждой такой группе вычисляется среднее за 10 минут и формируется соединение с исходными событиями измерительного прибора.
// Assuming the following input event type for sensorStream:
public class MeterReading
{
public string meterId;
public float usage;
}
var resultB = from s in sensorStream
group s by s.meterId into g
from e in
(from left in g
from right in
(from win in g
.AlterEventDuration(e => TimeSpan.FromMinutes(10))
.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avg = win.Avg(e => e.usage) })
select new { right.avg, left.usage })
select new { slidingAvg = e.avg, e.usage, g.Key };
Как было указано выше, функция, которая представляет ветвь применения, не может объединять какие-либо другие входящие потоки, кроме applyIn.