StreamInsight手札(三)——使用IObservable接口创建StreamInsight程序
本文将介绍如何利用IObservable接口创建StreamInsight应用程序。
Observer是.net Framework 4中引入的开发模式。IObserver<T>和IObservable<T>接口为基于推送的通知提供通用机制,也称为观察者设计模式。IObservable<T> 接口表示发送通知(提供程序)的类;IObserver<T> 接口表示接收通知(观察器)的类。
在下列应用场景下,适合采用Observable模式:
- 历史数据查询。对有限长的大数据集进行临时的查询。
- Ad hoc查询。对数据进行一系列交互的ad hoc查询。
- 内嵌于用户程序。StreamInsight内嵌于用户应用程序之中。
使用这种开发模型,开发者只需要:
- 实现IObservable接口
- 实现IObserver接口
- 绑定到查询
下面将用示例代码来说明.。
1、安装Reactive Extension for .Net 4:
2、新建一个工程,注意选择.net framework 4.0。
3、添加如下引用。System.CoreEx和System.Reactive在C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4\目录下。
定义事件Payload:
namespace HelloInsightObservable
{
public class HelloPayload
{
public int value { get; set; }
}
}
5、定义InputObservable类实现Iobservable接口
public class InputObservable : IObservable<int>
这个类需要实现Subscribe方法:
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer != null && !this.observers.Contains(observer))
{
this.observers.Add(observer);
}
return observer as IDisposable;
}
我们还需要在这个类里模拟输入。我们定义了一个Timer,当定时器时间到时,调用定时器的回调函数生成一个随机整数,并调用observer的OnNext方法推送数据。
private void GenerateInput(object _)
{
foreach (var observer in observers)
{
int value = random.Next(100);
Console.WriteLine("Random generated data {0} : {1}", generatedNumber, value);
observer.OnNext(value);
generatedNumber++;
if (generatedNumber >= dataNumber)
{
observer.OnCompleted();
timer.Change(Timeout.Infinite, timeSpan);
return;
}
}
timer.Change(timeSpan, timeSpan);
}
6、将Observable类作为CEP引擎的输入。
在Main函数中添加如下代码。注意加亮部分的代码,这里与适配器方式的程序不同的是,没有插入CTI事件。
var inputObservable = new InputObservable(10);
using (var server = Server.Create("DefaultInstance"))
{
var application = server.CreateApplication("Observable Application");
var stream = inputObservable.ToPointStream(application,
e => PointEvent.CreateInsert(DateTime.Now, new HelloPayload { value = e }),
AdvanceTimeSettings.StrictlyIncreasingStartTime,
"Observable Stream");
var query = from e in stream
where e.value > 50
select e;
}
7、定义OutputObserver类实现Iobserver接口。
class OutputObserver : IObserver<HelloPayload>
{
public virtual void OnCompleted()
{
Console.WriteLine("Stopping query...");
}
public virtual void OnError(Exception e)
{
Console.WriteLine("Unexpected error occured");
}
public virtual void OnNext(HelloPayload payload)
{
Console.WriteLine("DataOutput: {0}", payload.value);
}
}
8、将OutputObserver绑定到query。
在main函数中添加如下代码:
var outputObserver = new OutputObserver();
var outputObservable = query.ToObservable();
outputObservable.Subscribe(outputObserver);
Console.ReadLine();
Subscribe函数执行后,就会开始接收数据输入,并调用OnNext方法输出。注意最后一句话的作用是让程序继续运行直至数据输入完成,Observable调用Observer的OnComplete方法,或者用户中途通过输入任意字符停止Observer。
9、程序运行的结果如下:
软件测试工程师 金晶