可枚举事件源和事件接收器的端到端示例 (StreamInsight)
这一简单的端到端示例演示了如何使用事件源和事件接收器来实现 IEnumerable 接口,以创建完整的 StreamInsight 应用程序:
Step 2 - Transform the input to a stream and describe the temporal characteristics
Step 4 - Convert the query output to an observable or enumerable stream
此示例将 StreamInsight 与 SQL Server 和 ADO.NET Entity Framework 结合使用,共同回答针对 Northwind 示例数据库中的历史数据的时间相关查询。该查询用来查找符合以下条件的时间间隔:在该时间间隔中某地区有 3 个以上处于活动状态的订单。
此示例使用一个 IEnumerable 事件源。使用实现 IObservable 的事件源的步骤是类似的。但是,可观察的输出会将数据推送给观察者 - 使用者不必像在针对可枚举源调用 foreach 时那样请求数据。
步骤 1 - 提供可观察或可枚举的事件源
首先,通过对 Northwind 数据库发出 LINQ to Entities 查询定义查询的源数据。结果 (databaseQuery) 在默认情况下实现 IEnumerable 接口。
// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
// Query all Orders where there is a known order date, ship date and ship region.
var databaseQuery = from o in northwind.Orders
where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
orderby o.OrderDate.Value
select o;
}
步骤 2 - 将输入转换为流,并描述该流的临时特征。
接下来,将查询的结果转换为间隔事件流:
// Transform the query results into a stream of interval events whose start and end
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
.ToStream(application, AdvanceTimeSettings.IncreasingStartTime,
o => IntervalEvent.CreateInsert(
o.OrderDate.Value,
o.ShippedDate.Value,
new { o.ShipRegion }));
此代码使用 AdvanceTimeSettings 类的帮助器 - IncreasingStartTime - 在每个事件后插入 delay 为 0(零)的 CTI 事件。或者,您也可以使用 StrictlyIncreasingStartTime 指定 delay 为 -1 个时钟周期(这样可将 CTI 置于事件开始时间“之后”一个时钟周期),或使用 UnorderedTimestamps 为 delay 指定自定义值。
然后,IntervalEvent 类的 CreateInsert 方法通过提供 OrderDate 作为开始时间,将 Shipped Date 作为间隔的结束时间,将 ShipRegion 作为事件负载,以此方式将源数据转换为事件流。
PointEvent 类提供了类似的 CreateInsert 方法,而 EdgeEvent 类具有 CreateStart 和 CreateEnd 方法。三个事件类全都具有 CreateCti 方法,可在过程中插入 CTI 事件,而不是通过声明 AdvanceTimeSettings 来插入 CTI 事件。
步骤 3 - 编写查询
接下来,编写适用于传入事件流的可感知时间的 StreamInsight 查询。
// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
group o by o.ShipRegion into g
from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
where agg.OrderCount > 3
select agg;
此查询按地区对每个时间间隔中的事件进行分组,然后仅选择有超过 3 个活动事件的那些间隔。它将结果投影到具有完全不同的负载(包括活动订单数和 ShipRegion 的标识符)的新流中。
有关编写查询模板的详细信息,请参阅:
步骤 4 - 将查询输出转换为可观察或可枚举的事件接收器
接下来,将查询的输出流转换为可枚举的结果:
// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
where intervalEvent.EventKind != EventKind.CTI
select new
{
intervalEvent.StartTime,
intervalEvent.EndTime,
intervalEvent.Payload.OrderCount,
intervalEvent.Payload.ShipRegion,
};
此查询筛选掉 CTI 事件并仅将插入事件投影为可枚举的间隔事件流。具有 4 个字段的新匿名类型包含事件的负载。
除 ToIntervalEnumerable 方法之外,相关的扩展方法包括:
ToPointEnumerable 和ToEdgeEnumerable
ToPointObservable、ToIntervalObservable 和 ToEdgeObservable
这些方法返回 ICepEnumerable 或 ICepObservable 接口,这些接口通过提供查询名称和查询说明来标识查询以便进行管理和调试,从而扩展了 IEnumerable 和 IObservable 基础接口。
ICepEnumerable 或 ICepObservable 接口还提供了可通过选择 (Where) 或投影 (Select) 来筛选输出事件的帮助器方法。例如:
observableOutput = result
.ToPointObservable()
.Where( e => e.EventKind != EventKind.Cti)
.Select(e => e.Payload);
步骤 5 - 使用输出
最后,使用查询结果。请注意,由于典型 LINQ 提供程序的延迟求值模型的原因,只有在使用者开始枚举或观察结果后才开始对查询求值。
// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.",
activeInterval.StartTime,
activeInterval.EndTime,
activeInterval.OrderCount,
activeInterval.ShipRegion);
}