End-to-End Example of an Enumerable Event Source and Event Sink (StreamInsight)
This simple end-to-end demonstrates the use of an event source and event sink that implement the IEnumerable interface to create a complete StreamInsight application:
Step 2 - Transform the input to a stream and describe the temporal characteristics
Step 4 - Convert the query output to an observable or enumerable stream
This example uses StreamInsight in combination with SQL Server and the ADO.NET Entity Framework to answer a time-related query over historical data from the Northwind sample database. The query finds time intervals during which more than 3 orders were active within a region.
This example uses an IEnumerable event source. The steps for using an event source that implements IObservable are similar. However an observable output pushes data to the observer – the consumer does not have to pull the data, as it does when calling foreach over an enumerable source.
Step 1 - Provide an observable or enumerable event source
First, define the source data for the query by issuing a LINQ to Entities query over the Northwind database. The result (databaseQuery) by default implements the IEnumerable interface.
// Connect to the Northwind sample database on SQL Server. Use the default Entity Model
// generated by the Entity Framework for the Northwind sample database.
using (NorthwindEntities northwind = new NorthwindEntities())
{
// Query all Orders where there is a known order date, ship date and ship region.
var databaseQuery = from o in northwind.Orders
where o.OrderDate.HasValue && o.ShippedDate.HasValue && o.ShipRegion != null
orderby o.OrderDate.Value
select o;
}
Step 2 - Transform the input to a stream and describe the temporal characteristics of the stream
Next, transform the result of the query into a stream of interval events:
// Transform the query results into a stream of interval events whose start and end
// times are defined by the order and ship timestamps. Keep track of the shipping region.
var streamSource = databaseQuery
.ToStream(application, AdvanceTimeSettings.IncreasingStartTime,
o => IntervalEvent.CreateInsert(
o.OrderDate.Value,
o.ShippedDate.Value,
new { o.ShipRegion }));
This code uses a helper on the AdvanceTimeSettings class - IncreasingStartTime - to insert CTI events after each event with a delay of 0 (zero). Alternately, you can use StrictlyIncreasingStartTime to specify a delay of -1 tick (thus placing the CTI one tick after the event’s start time), or UnorderedTimestamps to specify a custom value for delay.
Then, the CreateInsert method of the IntervalEvent class converts the source data to a stream of events by providing the OrderDate as the start time, the Shipped Date as the end time of the interval, and the ShipRegion as the payload of the event.
An analogous CreateInsert method is provided by the PointEvent class, while the EdgeEvent class has CreateStart and CreateEnd methods. All 3 of the event classes have a CreateCti method for inserting CTI events procedurally, rather than through the declarative use of AdvanceTimeSettings.
Step 3 - Write a query
Next, write the time-aware StreamInsight query that is appropriate for the incoming stream of events:
// Find time intervals during which more than 3 orders are in process within a region.
var streamQuery = from o in streamSource
group o by o.ShipRegion into g
from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { OrderCount = window.Count(), ShipRegion = g.Key } into agg
where agg.OrderCount > 3
select agg;
This query groups the events in each time interval by region, then selects only those intervals where more than 3 events were active. It projects the results into a new stream with an entirely different payload that includes the count of active orders and the identifier for the ShipRegion.
For more information about writing query templates, see:
Step 4 - Convert the query output to an observable or enumerable event sink
Next, transform the output stream from the query into an enumerable result:
// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
where intervalEvent.EventKind != EventKind.CTI
select new
{
intervalEvent.StartTime,
intervalEvent.EndTime,
intervalEvent.Payload.OrderCount,
intervalEvent.Payload.ShipRegion,
};
This query filters out the CTI events and projects only the insert events into an enumerable stream of interval events. A new anonymous type with 4 fields contains the payload of the events.
In addition to the ToIntervalEnumerable method, related extension methods include:
ToPointEnumerable and ToEdgeEnumerable
ToPointObservable, ToIntervalObservable, and ToEdgeObservable
These methods return ICepEnumerable or ICepObservable interfaces that extend the base IEnumerable and IObservable interfaces by providing a query name and query description to identify the query for management and debugging purposes.
The ICepEnumerable or ICepObservable interfaces also provide helper methods that filter output events through selection (Where) or projection (Select). For example:
observableOutput = result
.ToPointObservable()
.Where( e => e.EventKind != EventKind.Cti)
.Select(e => e.Payload);
Step 5 - Consume the output
Finally, consume the results of the query. Note that due to the deferred evaluation model of the typical LINQ provider, queries are not evaluated until the consumer begins to enumerate or observe the results:
// Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
Console.WriteLine("Between {0} and {1}, {2} orders were active in the '{3}' region.",
activeInterval.StartTime,
activeInterval.EndTime,
activeInterval.OrderCount,
activeInterval.ShipRegion);
}