Share via


Using Observables as Event Sources

[This topic is pre-release documentation and is subject to change in future releases. Blank topics are included as placeholders.]

As an alternative to using input and output adapters, you can use the IObservable/IObserver interface as the producer and consumer of event sources and sinks. The IObservable/IObserver development model ties the CEP event production and consumption tighter into the CLR model of querying over enumerations of events by using LINQ. The event source is defined as an IObservable object. The CEP query acts as an IObserver object of this source and presents its result as an IObservable object to the event sink, which is again an IObserver object.

Example

In the following example, the process for using the IObservable/IObserver interface is demonstrated. We assume the definition of the following event type:

    public class MyPayload
    {
        public double value;
        public MyPayload() { }
        public MyPayload(double i) { value = i; }
    }

First, the input source is modeled as an IObservable object. The namespace Microsoft.ComplexEventProcessing.Adapters.Observable provides various extension methods to accomplish this task, for example, the object can be created from an IEnumerable object such as a System.Collections.Generic.List.

         List<MyPayload> numberSeries = new List<MyPayload>();
         for (int i = 0; i <= 20; i++)
             numberSeries.Add(new MyPayload(i));

            IObservable<MyPayload> observableInput = numberseries.ToObservable();

        IObservable<MyPayload> observableInput = numberseries.ToObservable();

Here, the extension method ToObservable() wraps the IEnumerable in an IObservable object that can now be used as a CEP stream.

        var input = observableInput.ToCepStream(e => DateTime.Now.Add(TimeSpan.FromSeconds(e.value)));

        var result = from e in input
                     select new MyPayload { value = e.value % 2 };

The resulting stream is then encapsulated in an IObservable object.

        var queryOutput = result.ToObservable<MyPayload>();
        IDisposable disposable = queryOutput.Subscribe(new ObserveToConsole<MyPayload>());

The query is implicitly started as soon as the first observer subscribes to it. In this example, the observer simply dumps the result events to the console window.:

   public class ObserveToConsole<T> : IObserver<T>
    {
        private StreamWriter _writer;
        FieldInfo[] _fieldInfos = typeof(T).GetFields();

        public ObserveToConsole()
        {
            _writer = new StreamWriter(Console.OpenStandardOutput());
        }

        public void OnCompleted()
        {
            _writer.Flush();
            _writer.Close();
        }

        public void OnError(Exception error)
        { }

        public void OnNext(T value)
        {
            for (int fieldNo = 0; fieldNo < _fieldInfos.Length; fieldNo++)
            {
                _writer.Write(fieldInfos[fieldNo].GetValue(value).ToString());
                if (fieldNo != _fieldInfos.Length - 1)
                {
                    _writer.Write(", ");
                }
            }
            _writer.WriteLine();
        }
    }

In the example above, one specific version of the ToCepStream() method was used to create a stream of point events with an explicit setting of the events' timestamps. In that example, the lambda expression mapped one of the type's fields into the CEP event timestamps. Another version of the ToCepStream() method allows the user specify an entire type mapping, as shown in the following example. Through this version, the user can specify a type mapping in a flexible way, including the type of event (insert or CTI), the timestamps, and the payload construction.

var input = from e in observableInput.ToCepStream(e =>
                new CepPointEvent<MyEventLogEntry>
                {
                    EventKind = Microsoft.ComplexEventProcessing.EventKind.Insert,
                    StartTime = e.TimeGenerated,
                    Payload = new MyEventLogEntry
                    {
                        Category = e.Category.Substring(0, Math.Min(maxStringLength, e.Category.Length)),
                        CategoryNumber = e.CategoryNumber,
                        Data = e.Data,
                        EntryType = (int)e.EntryType,
                        Index = e.Index,
                        InstanceId = e.InstanceId,
                        MachineName = null == e.MachineName ? null : e.MachineName.Substring(0, Math.Min(maxStringLength, e.MachineName.Length)),
                        Message = null == e.Message ? null : e.Message.Substring(0, Math.Min(maxStringLength, e.Message.Length)),
                        Source = null == e.Source ? null : e.Source.Substring(0, Math.Min(maxStringLength, e.Source.Length)),
                        TimeGenerated = e.TimeGenerated,
                        TimeWritten = e.TimeWritten,
                        UserName = null == e.UserName ? null : e.UserName.Substring(0, Math.Min(maxStringLength, e.UserName.Length)),
                    }
                } )
            select e;

All versions of the ToCepStream() method allow for the specification of AdvanceTime as well, as shown in the following example. The example uses a simple list as input event data. For more information, see Creating and Binding Query Templates.

StockTicks[] mylist = new StockTicks[]
{
    new StockTicks("3/24/2009 4:05:00 PM", "0", 300, -5),
    new StockTicks("3/24/2009 4:06:00 PM", "0", 303, 3),
    new StockTicks("3/24/2009 4:07:00 PM", "0", 297, -6),
    new StockTicks("3/24/2009 4:08:00 PM", "0", 289, -8)
};
AdvanceTimeSettings ats = new AdvanceTimeSettings(1, TimeSpan.FromSeconds(0), AdvanceTimePolicy.Drop);

var inputStream = mylist.ToObservable().ToCepStream(e => e.ts, ats);

See Also

Concepts

IObservable/IObserver Development Model