TopK
本主題的範例將示範如何使用 TopK 作業來分級視窗資料流中的事件,並根據遞增或遞減的次序來傳回前 N 個事件。TopK 是透過 Take() 擴充方法所指定,而它會預期 CepOrderedStream<T> 類型的資料流。一旦指定了 orderby 子句,就會產生這個類型。orderby 子句必須在 CepWindowStream<T> 資料流中的事件上運作。因此,快照集或跳動 (轉動) 視窗運算子必須套用至輸入資料流。
萬一發生繫結,TopK 將傳回 N 以上的事件,因此它一律以決定性的方式運作。
TopK 無法搭配計數視窗一起運作。
TopK 會實作為時間不緊迫的運算子。因此,輸出事件的存留期間將會根據視窗的輸出原則而設定。
範例
下列範例會從針對輸入資料流 inputStream 所定義的每個快照集視窗中取得前五個事件並產生新的事件資料流。每一個視窗中的事件都會依照裝載欄位 e.f 中值的遞增順序並結合裝載欄位 e.i 中值的遞減順序來排序。
// Assuming the following input event type for inputStream:
public class MyPayload
{
public int f;
public int i;
}
var topfive = (from window in inputStream.Snapshot()
from e in window
orderby e.f ascending, e.i descending
select e).Take(5);
您可以在 Take 子句中使用投射,如下列範例所示。
var result = (from win in source
.TumblingWindow(TimeSpan.FromMinutes(10), HoppingWindowOutputPolicy.ClipToWindowEnd)
from r in win
orderby r.Value descending
select new { scaled = r.Value / 10, r.SourceId }).Take(2, e => new
{
ranking = e.Rank,
e.Payload. scaled,
e.Payload.SourceId
});
如果實際的分級結果需要投射到事件的裝載中,就會使用對應的 Lambda 運算式。然後,系統會透過 Rank 屬性存取分級,而裝載欄位則可透過 Payload 屬性存取。
var topthree = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(10), TimeSpan.FromMinutes(2), WindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.f ascending, e.i descending
select e).Take(3, e => new
{
ranking = e.Rank,
f = e.Payload.f,
i = e.Payload.i
});
下圖顯示 TopK 結果的暫時屬性。這個範例搭配 TopK 使用跳動視窗,以便從每一個視窗針對裝載欄位 val 挑選最高裝載值的兩個事件。
var two = (from window in inputStream.HoppingWindow(TimeSpan.FromMinutes(30), TimeSpan.FromMinutes(10), WindowOutputPolicy.ClipToWindowEnd)
from e in window
orderby e.val descending
select e).Take(2);
橙色方塊表示視窗。此圖顯示 TopK 如何將選定裝載的存留期間設定為視窗大小。在這裡,我們假設時間點事件位於輸入資料流中,顯示其裝載欄位 val 的值。請注意,指定之 ClipToWindowEnd 的輸出原則會將視窗的時間範圍指派到結果事件。
請務必了解用於繫結之 TopK 的行為。假設下列範例中,輸入資料流包含重複的值,而且會透過輪轉視窗計算前兩個值。在此範例中,會使用 PointAlignToWindowEnd 的預設輸出原則:
第一個視窗中的前兩個值為 3 和 2。其分級欄位擁有以下其中一個值的所有事件將包含在結果中,此結果會在這個範例中包含三個事件。