StreamInsight: Creating a LINQPad data context
This blog post is for those folks comfortable with creating a data stream from an IEnumerable, IObservable or StreamInsight adapter data source. If you’d like more details on creating those, have a look at this blog post, which covers creating an IEnumerable ‘canned’ data source then wrapping a LINQPad context around it.
Creating a data context is very straightforward:
- Create a new C# class library project in Visual Studio 2010 called SampleContext.
- Ensure that the Framework type is set to .NET Framework 4 (and NOT .NET Framework 4 Client Profile)
- Add a reference to Microsoft.ComplexEventProcessing.dll
- Add a reference to Microsoft.ComplexEventProcessing.Observable.dll
- This is required for the IObservable<T>.ToPointStream extension method.
- Add a reference to Microsoft.ComplexEventProcessing.Adapters.dll
- Optional – this is only required if you have an adapter embedded inside of your context
- Add a reference to System.Reactive.dll
- Optional – this is only required for creating different types of IObservable streams.
- Note: System.Reactive.dll is part of the Reactive Extensions framework, which needs to be downloaded separately.
- Derive a class from StreamInsightContext in the StreamInsightLinqPad.Samples assembly (available as part of the LINQPad driver for StreamInsight download).
- Expose your stream as a property of the class.
Download the sample project here. |
That’s it! Here’s what this looks like in practice. First we establish the basic class
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using StreamInsightLinqPad.Samples;
- using Microsoft.ComplexEventProcessing;
- using Microsoft.ComplexEventProcessing.Linq;
- using StreamInsight.Samples.Adapters.DataGenerator;
- /// <summary>
- /// Define a sample LINQPad context for StreamInsight
- /// </summary>
- public class SampleContext : StreamInsightContext
- {
- public SampleContext(Server server)
- : base(server)
- { }
- }
Now we’ll go ahead and expose three types of streams. The first will be one generated from an IEnumerable.
- SimpleEvent[] events = new SimpleEvent[]
- {
- new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 12, 0, 0), ID = 5, Message = "Test" },
- new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 13, 0, 0), ID = 6, Message = "Test2" },
- new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 14, 0, 0), ID = 7, Message = "Test3" },
- new SimpleEvent { Timestamp = new DateTime(2011, 1, 5, 15, 0, 0), ID = 5, Message = "Test4" },
- };
- /// <summary>
- /// Expose a stream created from an IEnumerable
- /// </summary>
- public CepStream<SimpleEvent> SimpleStream
- {
- get
- {
- return events.ToPointStream(this.Application, t =>
- PointEvent.CreateInsert<SimpleEvent>(t.Timestamp, t),
- AdvanceTimeSettings.IncreasingStartTime);
- }
- }
Next, we’ll use IObservable (created via Observer.Interval) to generate a “live” stream of events:
- /// <summary>
- /// Expose a stream created from an IObservable
- /// </summary>
- public CepStream<SimpleEvent> ObservableStream
- {
- get
- {
- var rand = new Random();
- // Create a simple observable that returns a random event every
- // 250 ms
- var interval = Observable.Interval(TimeSpan.FromMilliseconds(250))
- .Select(i => new SimpleEvent
- {
- ID = rand.Next(10),
- Timestamp = DateTime.UtcNow,
- Message = "Observable message!"
- });
- return interval.ToPointStream(Application, s =>
- PointEvent.CreateInsert(s.Timestamp, s),
- AdvanceTimeSettings.IncreasingStartTime,
- null);
- }
- }
Finally, we’ll leverage the DataGenerator sample adapter (from https://streaminsight.codeplex.com/) to demonstrate creating a LINQPad stream from an adapter.
- /// <summary>
- /// Expose a stream created from an adapter instance
- /// </summary>
- public CepStream<GeneratedEvent> AdapterStream
- {
- get
- {
- var generatorConfig = new GeneratorConfig()
- {
- CtiFrequency = 1,
- DeviceCount = 3,
- EventInterval = 250,
- EventIntervalVariance = 10,
- MaxValue = 100
- };
- var inputStream = CepStream<GeneratedEvent>.Create(Application,
- "inputStream", typeof(GeneratorFactory), generatorConfig,
- EventShape.Point);
- return inputStream;
- }
- }
To test, we’ll need to configure LINQPad with our assembly as a data context. To do this:
- Add Connection, then select Microsoft StreamInsight from the list of drivers.
- Select Custom Context from the Context Kind dropdown.
- Click on the ellipsis for Assembly Path (...), then browse to the directory containing SampleContext.dll.
- Click OK to finish selecting the new context.
The context streams should now be visible in LINQPad, similar to the screenshot below. Create a new Query, and select C# Statements as the Language, and StreamInsight: SampleContext as the database.
Let’s dump our streams, using this LINQ code:
- // Dump out the enumerable stream. Since this is an IEnumerable, it
- // will eventually complete and return
- SimpleStream.ToEnumerable().Dump("Enumerable stream");
- // Dump out the observable stream. As this is an IObservable without
- // an end condition, we will have to stop the query (as it will not
- // stop on its own)
- ObservableStream.ToObservable().Dump("Observable stream");
This will show us our enumerable and observable streams:
Finally, let’s dump out some data from our adapter stream:
- // Dump out the adapter stream. As this is an adapter without
- // an end condition, we will have to stop the query (as it will not
- // stop on its own)
- AdapterStream.ToObservable().Dump("Adapter stream");
Resulting in:
Note – I had to make one change to the adapter code to make it work with ToObservable. The GeneratedEvent definition doesn’t define a default constructor (which is needed by ToObservable). In GeneratedEvent.cs, on line 23 I added the code:
|
That’s it – a LINQPad data context in a nutshell.