다음을 통해 공유


StreamInsight手札(二)——构建HelloInsight程序

本文将详细介绍如何建立一个简单的StreamInsight程序——HelloInsight。这个程序接受来自输入适配器的字符串输入,并通过输出适配器输出。

1、安装StreamInsight和Visual Studio 2010。

2、新建一个新的工程。可以选择.Net Framework 3.5 SP1或.Net Framework 4。

3、添加引用。引用dll的位置在<your driver>\program files\microsoft streamInsight1.1\C:\Program Files\Microsoft StreamInsight 1.1\Bin。

4、在program.cs using添加到引用: 

using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;
using Microsoft.ComplexEventProcessing.Diagnostics;
using Microsoft.ComplexEventProcessing.Linq;
using Microsoft.ComplexEventProcessing.ManagementService;

5、Main函数创建CEP服务器。DefaultInstance是StreamInsight安装时指定的实例名。

using (Server server = Server.Create("DefaultInstance"))
{
    Application application = server.CreateApplication("HelloInsight");

6、创建事件负载(payload)。事件的负载是输入流里面需要CEP服务器处理的数据。事件负载不能是用户定义类型。

namespace HelloInsight
{
    public class HelloPayload
    {
        public string str { get; set; }
    }

7、创建输入输出适配器(Adapter)。输入输出适配器的结构基本一致,这里重点介绍输入适配器,输出适配器与之类似。适配器主要包括:配置类、工厂类和输入/输出类。配置类一般处理输入流的参数,比如流文件的名称等。工厂类用来实现输入、输出适配器的基类。输入、输出类处理数据流并生成可以供CEP服务器处理的事件。

HelloInsight这个例子里将输入的字符串作为参数定义配置类: 

namespace HelloInsight.InputAdapters
{
    public class HelloInputConfig
    {
        public string inputString { get; set; }
    }
}

工厂类定义如下:

using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;

namespace HelloInsight.InputAdapters
{
    public class HelloInputFactory : ITypedInputAdapterFactory<HelloInputConfig>
    {
        public InputAdapterBase Create<TPayload>(HelloInputConfig configInfo, EventShape eventShape)
        {
            // All the events are of point type, for this demo
            if (eventShape == EventShape.Point)
                return new HelloPointInput(configInfo);
            else
                return default(InputAdapterBase);
        }
        public void Dispose()
        {
        }
    }
}

输入类定义如下:

using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;

namespace HelloInsight.InputAdapters
{
    public class HelloPointInput : TypedPointInputAdapter<HelloPayload>
    {
        private HelloInputConfig _config;

        public HelloPointInput(HelloInputConfig config)
        {
            _config = config;
        }

        public override void Start()
        {
            ProduceEvents();
        }

        public override void Resume()
        {
            ProduceEvents();
        }

        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
        }

        private void ProduceEvents()
        {
            var pendingEvent = CreateInsertEvent();
            pendingEvent.StartTime = DateTime.Now;
            pendingEvent.Payload = new HelloPayload
            {
                str = _config.inputString
            };

EnqueueOperationResult result = Enqueue(ref pendingEvent);
           EnqueueCtiEvent(DateTime.Now);

            Stopped();
        }
    }
}

输出类定义如下: 

using Microsoft.ComplexEventProcessing;
using Microsoft.ComplexEventProcessing.Adapters;

namespace HelloInsight.OutputAdapters
{
    public class HelloPointOutput : TypedPointOutputAdapter<HelloPayload>
    {
        public HelloPointOutput(HelloOutputConfig config)
        {
        }

        public override void Start()
        {
            ConsumeEvents();
        }

        public override void Resume()
        {
            ConsumeEvents();
        }

        protected override void Dispose(bool disposing)
        {
            base.Dispose(disposing);
        }

        private void ConsumeEvents()
        {
            PointEvent<HelloPayload> currEvent;
            DequeueOperationResult result;

            while (true)
            {
                if (AdapterState.Running == AdapterState)
                {
                    result = Dequeue(out currEvent);
                    if (result == DequeueOperationResult.Empty)
                    {
                        Ready();
                        return;
                    }
                    else
     {
                        if (currEvent.EventKind == EventKind.Insert)
                        {
                            Console.WriteLine("Output: " +
                            currEvent.Payload.str
                            );
                        }
                        ReleaseEvent(ref currEvent);
                    }
                }
                else if (AdapterState.Stopping == AdapterState)
                {
                    Stopped();
                }
       else
                {
                    return;
                }
            }
        }
    }
}

8、定义CEP服务器上的查询。在HelloInsight这个例子里,使用最简单的查询,即查询出输入流的全部输入。查询使用LINQ。

private static void DirectOutput(CepStream<HelloPayload> cepStream, Application application)
{
    // Return only "Close" values using a where-clause
    var filteredCepStream = from e in cepStream
                            select e;

    RunQuery(filteredCepStream, application, "directly output the input data");
}

private static void RunQuery(CepStream<HelloPayload> cepStream, Application application, string queryDescription)
{
    // Configure output adapter
    var outputConfig = new HelloOutputConfig();

    // Create query and bind to the output adapter
    var query = cepStream.ToQuery(application, Guid.NewGuid().ToString(), queryDescription, typeof(HelloOutputFactory), outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered);

    // Start query
    query.Start();

    // Wait until query change state
    DiagnosticView diagnosticView;

    do
    {
        Thread.Sleep(100);
        diagnosticView = query.Application.Server.GetDiagnosticView(query.Name);
    } while ((string)diagnosticView[DiagnosticViewProperty.QueryState] == "Running");

    // Stop query
    query.Stop();
}

这里,除了加亮部分县市的创建query的方法以外,用户还可以如下创建query:

var queryTemplate = application.CreateQueryTemplate("ExampleTemplate", "Description...", cepStream);
var queryBinder = new QueryBinder(queryTemplate);
queryBinder.BindProducer<StockQuote>("input", inputAdapter, inputConfig, EventShape.Point);
queryBinder.AddConsumer<StockQuote>("output", outputAdapter, outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered);

var query = application.CreateQuery("ExampleQuery", "Description...", queryBinder);

9、Main函数里调用输入输出和查询。

CepStream<HelloPayload> input = CepStream<HelloPayload>.Create("input");

var inputConfig = new HelloInputConfig
{
    inputString = "Hello StreamInsight!"
};

var outputConfig = new HelloOutputConfig();
var inputAdapter = application.CreateInputAdapter<HelloInputFactory>("HelloInput", "Description");
var outputAdapter = application.CreateOutputAdapter<HelloOutputFactory>("HelloOutput", "Description");
var inputStream = CepStream<HelloPayload>.Create("InputStream", typeof(HelloInputFactory), inputConfig, EventShape.Point);
DirectOutput(inputStream, application);

10、运行程序,输入的字符串输出到控制台。


 
至此,HelloInsight的程序创建完成。通过修改输入/输出适配器和查询,用户可以在此基础上实现更加复杂的程序了。

软件测试工程师 金晶

Comments

  • Anonymous
    March 10, 2013
    非常感谢您的分享,我刚开始接触StreamInsight,您写的一些内容我看得不是很明白,请问能否把您的这个示例的项目程序发送给我?我的邮箱是674544686@qq.com,非常期待您的回音。

  • Anonymous
    December 14, 2013
    感觉比较复杂