StreamInsight手札(二)——构建HelloInsight程序
本文将详细介绍如何建立一个简单的StreamInsight程序——HelloInsight。这个程序接受来自输入适配器的字符串输入,并通过输出适配器输出。
1、安装StreamInsight和Visual Studio 2010。
2、新建一个新的工程。可以选择.Net Framework 3.5 SP1或.Net Framework 4。
3、添加引用。引用dll的位置在<your driver>\program files\microsoft streamInsight1.1\C:\Program Files\Microsoft StreamInsight 1.1\Bin。
4、在program.cs using添加到引用:
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;
using Microsoft.ComplexEventProcessing.Diagnostics;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;
5、Main函数创建CEP服务器。DefaultInstance是StreamInsight安装时指定的实例名。
using (Server server = Server.Create("DefaultInstance"))
{
Application application = server.CreateApplication("HelloInsight");
}
6、创建事件负载(payload)。事件的负载是输入流里面需要CEP服务器处理的数据。事件负载不能是用户定义类型。
namespace HelloInsight
{
public class HelloPayload
{
public string str { get; set; }
}
}
7、创建输入输出适配器(Adapter)。输入输出适配器的结构基本一致,这里重点介绍输入适配器,输出适配器与之类似。适配器主要包括:配置类、工厂类和输入/输出类。配置类一般处理输入流的参数,比如流文件的名称等。工厂类用来实现输入、输出适配器的基类。输入、输出类处理数据流并生成可以供CEP服务器处理的事件。
HelloInsight这个例子里将输入的字符串作为参数定义配置类:
namespace HelloInsight.InputAdapters
{
public class HelloInputConfig
{
public string inputString { get; set; }
}
}
工厂类定义如下:
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;
namespace HelloInsight.InputAdapters
{
public class HelloInputFactory : ITypedInputAdapterFactory<HelloInputConfig>
{
public InputAdapterBase Create<TPayload>(HelloInputConfig configInfo, EventShape eventShape)
{
// All the events are of point type, for this demo
if (eventShape == EventShape.Point)
return new HelloPointInput(configInfo);
else
return default(InputAdapterBase);
}
public void Dispose()
{
}
}
}
输入类定义如下:
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;
namespace HelloInsight.InputAdapters
{
public class HelloPointInput : TypedPointInputAdapter<HelloPayload>
{
private HelloInputConfig _config;
public HelloPointInput(HelloInputConfig config)
{
_config = config;
}
public override void Start()
{
ProduceEvents();
}
public override void Resume()
{
ProduceEvents();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
}
private void ProduceEvents()
{
var pendingEvent = CreateInsertEvent();
pendingEvent.StartTime = DateTime.Now;
pendingEvent.Payload = new HelloPayload
{
str = _config.inputString
};
EnqueueOperationResult result = Enqueue(ref pendingEvent);
EnqueueCtiEvent(DateTime.Now);
Stopped();
}
}
}
输出类定义如下:
using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;
namespace HelloInsight.OutputAdapters
{
public class HelloPointOutput : TypedPointOutputAdapter<HelloPayload>
{
public HelloPointOutput(HelloOutputConfig config)
{
}
public override void Start()
{
ConsumeEvents();
}
public override void Resume()
{
ConsumeEvents();
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
}
private void ConsumeEvents()
{
PointEvent<HelloPayload> currEvent;
DequeueOperationResult result;
while (true)
{
if (AdapterState.Running == AdapterState)
{
result = Dequeue(out currEvent);
if (result == DequeueOperationResult.Empty)
{
Ready();
return;
}
else
{
if (currEvent.EventKind == EventKind.Insert)
{
Console.WriteLine("Output: " +
currEvent.Payload.str
);
}
ReleaseEvent(ref currEvent);
}
}
else if (AdapterState.Stopping == AdapterState)
{
Stopped();
}
else
{
return;
}
}
}
}
}
8、定义CEP服务器上的查询。在HelloInsight这个例子里,使用最简单的查询,即查询出输入流的全部输入。查询使用LINQ。
private static void DirectOutput(CepStream<HelloPayload> cepStream, Application application)
{
// Return only "Close" values using a where-clause
var filteredCepStream = from e in cepStream
select e;
RunQuery(filteredCepStream, application, "directly output the input data");
}
private static void RunQuery(CepStream<HelloPayload> cepStream, Application application, string queryDescription)
{
// Configure output adapter
var outputConfig = new HelloOutputConfig();
// Create query and bind to the output adapter
var query = cepStream.ToQuery(application, Guid.NewGuid().ToString(), queryDescription, typeof(HelloOutputFactory), outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered);
// Start query
query.Start();
// Wait until query change state
DiagnosticView diagnosticView;
do
{
Thread.Sleep(100);
diagnosticView = query.Application.Server.GetDiagnosticView(query.Name);
} while ((string)diagnosticView[DiagnosticViewProperty.QueryState] == "Running");
// Stop query
query.Stop();
}
这里,除了加亮部分县市的创建query的方法以外,用户还可以如下创建query:
var queryTemplate = application.CreateQueryTemplate("ExampleTemplate", "Description...", cepStream);
var queryBinder = new QueryBinder(queryTemplate);
queryBinder.BindProducer<StockQuote>("input", inputAdapter, inputConfig, EventShape.Point);
queryBinder.AddConsumer<StockQuote>("output", outputAdapter, outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered);
var query = application.CreateQuery("ExampleQuery", "Description...", queryBinder);
9、Main函数里调用输入输出和查询。
CepStream<HelloPayload> input = CepStream<HelloPayload>.Create("input");
var inputConfig = new HelloInputConfig
{
inputString = "Hello StreamInsight!"
};
var outputConfig = new HelloOutputConfig();
var inputAdapter = application.CreateInputAdapter<HelloInputFactory>("HelloInput", "Description");
var outputAdapter = application.CreateOutputAdapter<HelloOutputFactory>("HelloOutput", "Description");
var inputStream = CepStream<HelloPayload>.Create("InputStream", typeof(HelloInputFactory), inputConfig, EventShape.Point);
DirectOutput(inputStream, application);
10、运行程序,输入的字符串输出到控制台。
至此,HelloInsight的程序创建完成。通过修改输入/输出适配器和查询,用户可以在此基础上实现更加复杂的程序了。
软件测试工程师 金晶
Comments
Anonymous
March 10, 2013
非常感谢您的分享,我刚开始接触StreamInsight,您写的一些内容我看得不是很明白,请问能否把您的这个示例的项目程序发送给我?我的邮箱是674544686@qq.com,非常期待您的回音。Anonymous
December 14, 2013
感觉比较复杂