Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
本文将介绍如何利用IObservable接口创建StreamInsight应用程序。
Observer是.net Framework 4中引入的开发模式。IObserver<T>和IObservable<T>接口为基于推送的通知提供通用机制,也称为观察者设计模式。IObservable<T> 接口表示发送通知(提供程序)的类;IObserver<T> 接口表示接收通知(观察器)的类。
在下列应用场景下,适合采用Observable模式:
- 历史数据查询。对有限长的大数据集进行临时的查询。
- Ad hoc查询。对数据进行一系列交互的ad hoc查询。
- 内嵌于用户程序。StreamInsight内嵌于用户应用程序之中。
使用这种开发模型,开发者只需要:
- 实现IObservable接口
- 实现IObserver接口
- 绑定到查询
下面将用示例代码来说明.。
1、安装Reactive Extension for .Net 4:

2、新建一个工程,注意选择.net framework 4.0。
3、添加如下引用。System.CoreEx和System.Reactive在C:\Program Files\Microsoft Cloud Programmability\Reactive Extensions\v1.0.2856.0\Net4\目录下。

定义事件Payload:
namespace HelloInsightObservable
{
public class HelloPayload
{
public int value { get; set; }
}
}
5、定义InputObservable类实现Iobservable接口
public class InputObservable : IObservable<int>
这个类需要实现Subscribe方法:
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer != null && !this.observers.Contains(observer))
{
this.observers.Add(observer);
}
return observer as IDisposable;
}
我们还需要在这个类里模拟输入。我们定义了一个Timer,当定时器时间到时,调用定时器的回调函数生成一个随机整数,并调用observer的OnNext方法推送数据。
private void GenerateInput(object _)
{
foreach (var observer in observers)
{
int value = random.Next(100);
Console.WriteLine("Random generated data {0} : {1}", generatedNumber, value);
observer.OnNext(value);
generatedNumber++;
if (generatedNumber >= dataNumber)
{
observer.OnCompleted();
timer.Change(Timeout.Infinite, timeSpan);
return;
}
}
timer.Change(timeSpan, timeSpan);
}
6、将Observable类作为CEP引擎的输入。
在Main函数中添加如下代码。注意加亮部分的代码,这里与适配器方式的程序不同的是,没有插入CTI事件。
var inputObservable = new InputObservable(10);
using (var server = Server.Create("DefaultInstance"))
{
var application = server.CreateApplication("Observable Application");
var stream = inputObservable.ToPointStream(application,
e => PointEvent.CreateInsert(DateTime.Now, new HelloPayload { value = e }),
AdvanceTimeSettings.StrictlyIncreasingStartTime,
"Observable Stream");
var query = from e in stream
where e.value > 50
select e;
}
7、定义OutputObserver类实现Iobserver接口。
class OutputObserver : IObserver<HelloPayload>
{
public virtual void OnCompleted()
{
Console.WriteLine("Stopping query...");
}
public virtual void OnError(Exception e)
{
Console.WriteLine("Unexpected error occured");
}
public virtual void OnNext(HelloPayload payload)
{
Console.WriteLine("DataOutput: {0}", payload.value);
}
}
8、将OutputObserver绑定到query。
在main函数中添加如下代码:
var outputObserver = new OutputObserver();
var outputObservable = query.ToObservable();
outputObservable.Subscribe(outputObserver);
Console.ReadLine();
Subscribe函数执行后,就会开始接收数据输入,并调用OnNext方法输出。注意最后一句话的作用是让程序继续运行直至数据输入完成,Observable调用Observer的OnComplete方法,或者用户中途通过输入任意字符停止Observer。
9、程序运行的结果如下:

软件测试工程师 金晶