共用方式為


可列舉事件來源及事件接收的端對端範例 (StreamInsight)

這個簡單的端對端範例將示範如何使用實作 IEnumerable 介面的事件來源和事件接收來建立完整的 StreamInsight 應用程式:

  1. Step 1 - Provide an observable or enumerable event source

  2. Step 2 - Transform the input to a stream and describe the temporal characteristics

  3. Step 3 - Write a query

  4. Step 4 - Convert the query output to an observable or enumerable stream

  5. Step 5 - Consume the output

這個範例會使用 StreamInsight 搭配 SQL Server 和 ADO.NET Entity Framework 來回應針對 Northwind 範例資料庫之歷程記錄資料所做的時間相關查詢。此查詢會尋找在某個地區內有 3 筆以上訂單有效的時間間隔。

此範例會使用 IEnumerable 事件來源。使用實作 IObservable 之事件來源的步驟都很相似。不過,可觀察的輸出會將資料發送至觀察器。取用者不需要提取資料,如同針對可列舉的來源呼叫 foreach 時這樣做。

步驟 1 - 提供可觀察或可列舉的事件來源

首先,請針對 Northwind 資料庫發出 LINQ to Entities 查詢,藉以定義查詢的來源資料。根據預設,其結果 (databaseQuery) 會實作 IEnumerable 介面。

// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
    // Query all Orders where there is a known order date, ship date and ship region.
    var databaseQuery = from o in northwind.Orders
                        where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
                        orderby o.OrderDate.Value
                        select o;
}

步驟 2 - 將輸入轉換成資料流並描述資料流的暫時特性

接著,請將查詢的結果轉換成間隔事件的資料流:

// Transform the query results into a stream of interval events whose start and end 
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
    .ToStream(application, AdvanceTimeSettings.IncreasingStartTime, 
        o => IntervalEvent.CreateInsert(
            o.OrderDate.Value,
            o.ShippedDate.Value, 
            new { o.ShipRegion }));

這段程式碼會使用 AdvanceTimeSettings 類別的 Helper (IncreasingStartTime),在 delay 為 0 (零) 的每個事件之後插入 CTI 事件。或者,您也可以使用 StrictlyIncreasingStartTime 來指定 -1 刻度的 delay (因而在事件開始時間之後的一個刻度放置 CTI),或使用 UnorderedTimestamps 來指定 delay 的自訂值。

然後,IntervalEvent 類別的 CreateInsert 方法就會提供 OrderDate 做為開始時間、提供 ShippedDate 做為間隔的結束時間,並且提供 ShipRegion 做為事件的裝載,藉以將來源資料轉換成事件的資料流。

類似的 CreateInsert 方法是由 PointEvent 類別提供,而 EdgeEvent 類別具有 CreateStart 和 CreateEnd 方法。這 3 個事件類別都具有 CreateCti 方法,可循序插入 CTI 事件,而非以宣告方式使用 AdvanceTimeSettings。

步驟 3 - 撰寫查詢

接著,請撰寫適用於事件之內送資料流的時間感知 StreamInsight 查詢:

// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
                  group o by o.ShipRegion into g
                  from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                  select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
                  where agg.OrderCount > 3
                  select agg;

這個查詢會依照地區分組每個時間間隔中的事件,然後僅選取 3 個以上事件有效的這些間隔。它會將結果投射到具有完全不同裝載的新資料流中,而且這個裝載包含有效訂單的計數和 ShipRegion 的識別碼。

如需有關撰寫查詢範本的詳細資訊,請參閱:

步驟 4 - 將查詢輸出轉換成可觀察或可列舉的事件接收

接著,請將查詢的輸出資料流轉換成可列舉的結果:

// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
              where intervalEvent.EventKind != EventKind.CTI
              select new 
              { 
                  intervalEvent.StartTime, 
                  intervalEvent.EndTime, 
                  intervalEvent.Payload.OrderCount,
                  intervalEvent.Payload.ShipRegion,
              };

這個查詢會篩選出 CTI 事件,並且僅將插入事件投射到間隔事件的可列舉資料流中。具有 4 個欄位的新匿名類型包含事件的裝載。

除了 ToIntervalEnumerable 方法以外,相關的擴充方法包括:

  • ToPointEnumerable 和ToEdgeEnumerable

  • ToPointObservable、ToIntervalObservable 和 ToEdgeObservable

這些方法會透過提供查詢名稱和查詢描述來識別管理和偵錯目的的查詢,傳回擴充基底 IEnumerable 和 IObservable 介面的 ICepEnumerable 或 ICepObservable 介面。

ICepEnumerable 或 ICepObservable 介面也會提供透過選取範圍 (Where) 或投射 (Select) 篩選輸出事件的 Helper 方法。例如:

observableOutput = result
    .ToPointObservable()
    .Where( e => e.EventKind != EventKind.Cti)
    .Select(e => e.Payload);

步驟 5 - 取用輸出

最後,請取用查詢的結果。請注意,由於一般 LINQ 提供者的延遲評估模型,所以系統會等到取用者開始列舉或觀察結果時,才評估查詢:

// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
    Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.", 
        activeInterval.StartTime, 
        activeInterval.EndTime, 
        activeInterval.OrderCount,
        activeInterval.ShipRegion);
}