Agrupar e aplicar
Os exemplos deste tópico demonstram como particionar eventos em grupos de eventos usando a funcionalidade "group by" da LINQ. As agregações e outras operações podem ser executadas nos grupos de eventos, de forma que cada grupo seja computado separadamente. O conjunto de operações aplicado a cada grupo é chamado de ramificação de aplicação. A ramificação de aplicação pode ser fornecida implicitamente dentro de uma única instrução de aplicação e agrupamento, ou, se ela contiver uma subconsulta mais complexa, como uma instrução LINQ separada. Observe que as ramificações de aplicação estão fechadas na construção de aplicação e agrupamento; por exemplo, não é possível unir o fluxo agrupado a um fluxo que está fora do agrupamento.
Exemplos
O exemplo a seguir agrupa eventos pela função modulo especificada. Ela aplica uma janela de instantâneo a cada grupo e computa a média em uma coluna de carga em cada grupo separadamente. Assim, a ramificação de aplicação consiste na janela e na agregação.
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int i;
public float j;
}
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.j) };
O exemplo anterior produziu um fluxo com um único campo de carga, contendo a média do campo j dentro de cada janela de instantâneo e por grupo.
Também é possível agrupar uma projeção do tipo original em uma cláusula "group by", conforme mostrado no exemplo a seguir.
var result = from e in source.AlterEventDuration(e => TimeSpan.FromMinutes(10))
group new { myVal = e.Value * 10 } by e.SourceId into g
from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new
{
avg = win.Avg(e => e.myVal)
};
Normalmente, a chave de agrupamento deve ser retida para que o resultado da agregação possa ser associado ao respectivo grupo. O próximo exemplo mostra como recuperar a chave de agrupamento.
var avgCount = from v in inputStream
group v by v.i % 4 into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.number), groupId = eachGroup.Key };
É possível agrupar em várias chaves, de forma que cada combinação de chave exclusiva no fluxo de entrada resulte em um grupo separado. Neste caso, as chaves de agrupamento devem estar contidas em uma definição de tipo anônimo para que possam ser recuperadas explicitamente na projeção final. Observe que todos os campos de agrupamento devem ser referenciados. O exemplo a seguir agrupa eventos por dois campos de carga de evento e atribui um novo nome de chave a um deles.
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var avgCount = from v in inputStream
group v by new { sec = v.section, v.category } into eachGroup
from window in eachGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avgNumber = window.Avg(e => e.value), section = eachGroup.Key.sec, category = eachGroup.Key.category };
A ramificação de aplicação pode ser mais complexa, contendo uma série de operações, conforme mostrado no exemplo a seguir.
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int section;
public string category;
public float value;
}
var result = from s in source
group s by s.section into sg
from e in
(from e in sg
group e by e.category into cg
from win in cg.TumblingWindow(TimeSpan.FromMinutes(5), HoppingWindowOutputPolicy.ClipToWindowEnd)
select new { cat = cg.Key, top = win.Max(e => e.value) })
select new { sec = sg.Key, e.cat, e.top };
No exemplo a seguir, um fluxo de leituras de medidor de potência é considerado, contendo os dados de vários medidores. O exemplo anota cada leitura com a média dos últimos 10 minutos do mesmo medidor. A consulta agrupa primeiro os dados de entrada por ID de medidor. Em cada grupo desse, a média ao longo de 10 minutos é computada e associada aos eventos de medidor originais.
// Assuming the following input event type for sensorStream:
public class MeterReading
{
public string meterId;
public float usage;
}
var resultB = from s in sensorStream
group s by s.meterId into g
from e in
(from left in g
from right in
(from win in g
.AlterEventDuration(e => TimeSpan.FromMinutes(10))
.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { avg = win.Avg(e => e.usage) })
select new { right.avg, left.usage })
select new { slidingAvg = e.avg, e.usage, g.Key };
Como mencionado anteriormente, a função que representa a ramificação de aplicação não pode integrar nenhum outro fluxo de entrada, exceto applyIn.