StreamInsight Sequence Integration: Five Easy Pieces
Over the past few weeks, I’ve spent time building a handful of applications using the new sequence integration APIs in StreamInsight 1.1. I think StreamInsight veterans will be pleasantly surprised at the seamlessness of the experience! If you’re new to StreamInsight, now’s your chance to quickly build a temporally aware application. In this post, I’ll walk through five components of a typical end-to-end StreamInsight query, from event source to event sink.
First, take some time to download the latest version and kick the tires:
- .NET 4.0 Download (.NET 3.5 SP1 is sufficient if you are not using
IObservable<>
event sources or event sinks). - StreamInsight 1.1 Download
- StreamInsight Samples
Now you're ready to build a StreamInsight application. A few considerations:
- If you are creating a .NET 4.0 application, make sure it is targeting the full .NET 4.0 profile, not the client profile. Configure the target framework through project properties in Visual Studio 2010.
- Add a reference to the
Microsoft.ComplexEventProcessing
assembly. If you plan on using anIObservable<>
event source or event sink, add a reference to theMicrosoft.ComplexEventProcessing.Observable
assembly as well. - Use the
Microsoft.ComplexEventProcessing
andMicrosoft.ComplexEventProcessing.Linq
namespaces. The first allows you to embed and manage a StreamInsight server. The second exposes the StreamInsight LINQ dialect.
Now we'll travel downstream, using the SequenceIntegration\Northwind sample as our guide.
Event Sources
Event sources for a query can be based on custom input adapters, other StreamInsight queries, or – new in version 1.1 – .NET IObservable<>
and IEnumerable<>
sequences. The good news: in the .NET world, IEnumerable<>
pull-based sequences are pervasive. SQL, OData, Sharepoint, you name it. Better news from the perspective of a Complex Event Processing system: asynchronous and push-based sequences can be easily exposed via the IObservable<>
interface, particularly if you take advantage of the .NET Reactive Framework (Rx).
In this simple example, we use two OData service queries as our event sources:
NorthwindEntities northwind = new NorthwindEntities(
new Uri("https://services.odata.org/Northwind/Northwind.svc"));
// Issue OData queries to determine start and end times for orders.
// So that the sources behave like temporal streams, we order by the
// corresponding dates.
var ordersWithRegions =
from o in northwind.Orders
where o.ShipRegion != null
select o;
var orderStartTimes =
from o in ordersWithRegions
where o.OrderDate != null
orderby o.OrderDate
select new { StartTime = (DateTime)o.OrderDate, o.OrderID, o.ShipRegion };
var orderEndTimes =
from o in ordersWithRegions
where o.ShippedDate != null
orderby o.ShippedDate
select new { EndTime = (DateTime)o.ShippedDate, o.OrderID };
The orderStartTimes
and orderEndTimes
queries implement IEnumerable<>
which makes them suitable event sources for StreamInsight.
Temporal streams
An IObservable<>
or IEnumerable<>
event source can feed a temporal stream. A temporal stream is a sequence of events annotated with temporal information: timestamps for events and punctuation indicating when a particular point in time has been committed. In the above example, we have two sources orderStartTimes
and orderEndTimes
each including timestamp fields – StartTime
and EndTime
respectively – as well as a commitment based on orderby
clauses that events timestamps are monotonic. We describe these temporal characteristics to StreamInsight using the ToPointStream
method:
// Map OData queries to StreamInsight inputs
var startStream = orderStartTimes.ToPointStream(orderApp, s =>
PointEvent.CreateInsert(s.StartTime, s),
AdvanceTimeSettings.IncreasingStartTime);
var endStream = orderEndTimes.ToPointStream(orderApp, e =>
PointEvent.CreateInsert(e.EndTime, e),
AdvanceTimeSettings.IncreasingStartTime);
The arguments to the ToPointStream
extension method are described below:
IEnumerable<T> source: the event source. In this case, the source is an OData query.
Application application: the StreamInsight application that will host the temporal query.
Func<TInput, PointEvent<TPayload> selector: takes an element of the source and turns it into a temporal event. The first selector loosely reads: “the event
s
happened at the point in times.StartTime
”. The first argument toPointEvent.CreateInsert
indicates the timestamp for the event, and the second argument describe the payload of the event – in this case the entire row.Aside: StreamInsight has restrictions on payload types. Basically, the payload must consist of a type or struct with only “primitive” (string, number, etc.) fields and properties. See the Payload Field Requirements section @ https://msdn.microsoft.com/en-us/library/ee378905.aspx for details. The event selector can be used to reshape your inputs to a supported payload type.
AdvanceTimeSetting advanceTimeSettings: an optional parameter that describes a policy for automatically generating punctuation. In the above example, we indicate that timestamps are increasing in the input. StreamInsight can then automatically generate Current Time Increment (CTI) punctuation for each event: “we commit to everything before the event’s timestamp”.
string streamName: an optional parameter allowing you to assign a name to the input stream. I have not specified a stream name in the above example. This feature is particularly useful if you’re importing punctuations from one stream to another (see the SequenceIntegration\PerformanceCounters sample for instance).
Note that there are several variations on the To*Stream
method supporting IObservable<>
or IEnumerable<>
event sources and the shaping of point, interval or edge data.
Take a look at Cip’s Time in StreamInsight series for more information.
Temporal query
Now that we have described the temporal characteristics of our event sources, we can compose a StreamInsight query. I’ll simply copy the code here without too much explanation since I’m focused on data ingress and egress in this post:
// Use clip to synthesize events lasting from the start of each order to the end
// of each order.
var clippedStream = startStream
.AlterEventDuration(e => TimeSpan.MaxValue)
.ClipEventDuration(endStream, (s, e) => s.OrderID == e.OrderID);
// Count the number of coincident orders per region
var counts = from o in clippedStream
group o by o.ShipRegion into g
from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { ShipRegion = g.Key, Count = win.Count() };
// Display output whenever there are more than 2 active orders in a region.
const int threshold = 2;
var query = from c in counts
where c.Count > threshold
select c;
Event sink
Creating an event sink is straightforward. Several extension methods support the transformation of a temporal query (CepStream<>
) to an event sink, with support for permutations of IObservable
or IEnumerable
sequences and TPayload
, PointEvent<TPayload>
, IntervalEvent<TPayload>
or EdgeEvent<TPayload>
elements. In the following example, we translate the query to a sequence of interval events using ToIntervalEnumerable
. We then filter out insert events – skipping CTI punctuation – and project out relevant temporal and payload fields:
// Map the query to an IEnumerable sink
var sink = from i in query.ToIntervalEnumerable()
where i.EventKind == EventKind.Insert
select new { i.StartTime, i.EndTime, i.Payload.Count, i.Payload.ShipRegion };
Consuming results
Now that we’re back in the world of .NET sequences, there are many possibilities for consuming the results. For now, we’ll just write the event sink contents to the console:
foreach (var r in sink)
{
Console.WriteLine(r);
}
Interestingly, calling GetEnumerator
on sink
triggers a sequence of actions:
- A stream query is deployed to the embedded server and started, which implicitly spins up the query inputs, triggering…
- calls to
GetEnumerator
on the event sources we defined earlier, which… - causes a query against the OData service to be executed.
StreamInsight queries now composes seamlessly with other LINQ providers! In fact, if you review the code above you can see that we’ve leveraged LINQ to OData, LINQ to Objects, and LINQ to StreamInsight. The SequenceIntegration samples linked above illustrate some other integration possibilities as well:
- A WPF control that observers an
IObservable
event sink. - An
IObservable
event source that polls performance counters. - An
IEnumerable
event source that reads the contents of a file.
There – you have the five easy pieces! Comes with a side-order of toast.
Comments
Anonymous
December 08, 2010
What is NorthwindEntities ???? Can't find it.Anonymous
January 05, 2011
NorthwindEntities is an Entity Framework data context backed by the SQL Server Northwind database. It is included in sequence integration sample project that can be downloaded @streaminsight.codeplex.com/.../46435.Anonymous
May 18, 2015
for the "Consuming results" part (the foreach statement), it is executed once. How can I continuously consume the result? Should I write it as following: while (true) { foreach (var r in sink) { Console.WriteLine(r); } }