StreamInsight手札(四)——使用IEnumerable接口创建StreamInsight程序
本文将介绍如何利用IEnumerable接口创建StreamInsight应用程序。
IEnumerable与IObservable最大的区别在于IEnumerable是采用Pull模式获取数据,而IObservable则是用Push模式推送数据。
用户首先通过IEnumerable.GetEnumerator来获得数据源的Enumerator,然后通过调用IEnumerator.MoveNext和IEnumerator.Current来获取数据。StreamInsight的输入输出通过一个数据队列连接。输入适配器获得数据,并调用Enqueue将数据放入队列,输出适配器循环检测队列是否为空,如果不是空队列,则调用Dequeue将数据取出。
在HelloInsight程序基础上,可以通过如下修改来实现IEnumerable接口。
1、获取输入数据源的Enumerator(HelloPointInput.cs):
public HelloPointInput(HelloInputConfig config)
{
_config = config;
var streamReader = new StreamReader(config.fileName);
strings = new List<string>();
while (!streamReader.EndOfStream)
{
strings.Add(streamReader.ReadLine());
}
stringEnumerator = strings.GetEnumerator();
streamReader.Close();
}
2、读取Enumerator中的数据,并放入队列。注意判断队满的情况(HelloPointInput.cs):
private void ProduceEvents()
{
while (AdapterState != AdapterState.Stopping)
{
if (stringEnumerator.MoveNext())
{
try
{
var line = stringEnumerator.Current;
var pendingEvent = CreateInsertEvent();
pendingEvent.StartTime = DateTime.Now;
pendingEvent.Payload = new HelloPayload
{
str = line
};
EnqueueOperationResult result;
result = Enqueue(ref pendingEvent);
if (result == EnqueueOperationResult.Full)
{
Thread.Sleep(1000);
//Ready();
return;
}
EnqueueCtiEvent(DateTime.Now);
Thread.Sleep(1000);
}
catch
{
//error handling should go here
}
}
else
{
break;
}
}
Stopped();
}
注意:在IEnumerator模式下,CTI事件是必须的,否则不会产生数据输出。
3、循环读取队列中的数据,并进行输出处理(HelloPointOutput.cs):
private void ConsumeEvents()
{
PointEvent<HelloPayload> currEvent;
DequeueOperationResult result;
while (AdapterState != AdapterState.Stopping)
{
result = Dequeue(out currEvent);
if (result == DequeueOperationResult.Empty)
{
Ready();
return;
}
else
{
if (currEvent.EventKind == EventKind.Insert)
{
Console.WriteLine("Output: " +
currEvent.Payload.str
);
}
ReleaseEvent(ref currEvent);
}
}
Stopped();
}
4、运行程序
建立测试文件:
将输入数据源指向测试文件的存储地址。运行程序,得到如下输出:
软件测试工程师 金晶