StreamInsight 端到端示例
本主题介绍在创建 StreamInsight 应用程序时涉及的不同组件和步骤并且包括应用程序的一个端到端示例。StreamInsight 应用程序将事件源、事件接收器和查询组合在一起以便实现复杂的事件处理方案。StreamInsight API 提供多种不同的接口以便在创建和维护事件处理应用程序时支持不同级别的控制和复杂性。
应用程序部署的最小单位是可以启动和停止的“查询”。下图显示了用于生成查询的一个方法。事件源由“输入适配器”表示。该适配器将事件流馈送到运算符树中,该树表示设计器以“查询模板”的形式指定的期望的查询逻辑。然后,处理后的事件流将进入事件接收器中,该接收器通常是“输出适配器”。
还不太熟悉复杂事件处理术语的开发人员应阅读 StreamInsight 服务器概念和 StreamInsight 服务器体系结构。
应用程序进程
本节分步介绍创建端到端应用程序的一般过程。
实例化服务器实例和应用程序
该过程从实例化 StreamInsight 服务器实例和应用程序开始。
server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");
必须通过在 StreamInsight 安装过程在计算机上注册的实例名称(在前面的示例中,该名称为 MyInstance)创建服务器。有关详细信息,请参阅安装 (StreamInsight)。
“应用程序”表示服务器中包含其他元数据实体的范围单位。
前面的例子中以相同的过程中创建了一个服务器实例。不过,另一个常见的部署是连接到远程服务器并且在远程服务器处理某一现有应用程序。下例说明如何连接到远程服务器和访问现有应用程序。
server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];
有关本地和远程服务器的详细信息,请参阅发布到和连接到 StreamInsight 服务器。
创建输入流
接下来,在现有适配器实现的基础上创建一个输入流。确切地说,必须如下例中所示指定适配器工厂。
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
这将创建一个 CepStream 对象,并且表示一个事件流,该流由通过给定的工厂类实例化的适配器生成(在启动查询后)。为该流提供一个名称,该名称可在以后用于检索特定于流的诊断。此外,提供用于适配器工厂的配置结构的实例。该配置结构将特定于运行时的信息传递到该适配器工厂以及所需的事件形状(事件模型)。有关该工厂如何使用这些参数的详细信息,请参阅创建输入和输出适配器。
定义查询
CepStream 对象用作实际查询逻辑定义的基础。该查询使用 LINQ 作为查询规范语言:
var filtered = from e in inputstream
where e.Value > 95
select e;
在这个示例中,我们假定在前面的示例中为创建输入流对象而定义的名为 MyDataType 的类或结构包含名为 Value 的字段。该定义转换为一个筛选器运算符,该运算符从没有满足筛选器谓词 where e.Value > 95 的流中删除所有事件。有关 LINQ 查询运算符的详细信息,请参阅在 LINQ 中编写查询模板。
创建输出适配器
此时,变量 filtered 的类型仍是 CepStream。这允许流转换为可启动的查询。为了生成可启动的查询实例,必须如下例中所示指定输出适配器。
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
与输入流一样,输出适配器也要求指定输出适配器工厂、配置对象、所需的输出流形状以及临时排序。
指定事件形状将确保相应的事件形状处于查询输出:
EventShape.Point:任何结果事件生存期都将缩减为点事件。
EventShape.Interval:任何结果事件都将解释为间隔事件。也就是说,如果事件的完整生存期由当前时间增量 (CTI) 事件提交,则该事件仅是输出。
EventShape.Edge:任何结果事件都将解释为边缘事件。也就是说,其开始时间作为开始边缘输出,并且其结束时间作为相应的结束边缘输出。
流事件顺序参数影响间隔事件输出流的生存情况。FullyOrdered 意味着间隔事件始终按照其开始时间的顺序输出,而 ChainOrdered 生成按间隔结束时间排序的输出序列。
此外,应用程序对象必须作为第一个参数提供,现在包含查询以及查询名称和说明(进一步在元数据存储区中标识此查询)。
启动查询
最后一步是启动查询。在这个例子中,查询由用户提供的击键操作停止。
query.Start();
Console.ReadLine();
query.Stop();
此端到端示例演示如何通过 CepStream.Create() 和 ToQuery() 重载将事件源的隐式绑定与查询模板一起使用,以便快速创建工作查询。有关对 CEP 对象绑定的更多显式控制,请参阅使用查询绑定器。
完整示例
下面的示例将前面介绍的各个部分综合起来,创建了一个完整的应用程序。
Server server = null;
using (Server server = Server.Create(”MyInstance”))
{
try
{
Application myApp = server.CreateApplication("MyApp");
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
var filtered = from e in inputstream
where e.Value > 95
select e;
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
query.Start();
Console.ReadLine();
query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}