在运行时撰写查询

通过在运行时撰写 StreamInsight 查询可提高查询灵活性、可重用性、有效利用资源并且易于维护。它支持您:

  • 将一个查询的查询结果提供给同一服务器上的其他查询。

  • 使用其他运行查询的输出,就像使用来自输入适配器的事件那样。

两个撰写的查询(例如,查询 1 为查询 2 提供输入)是分开运行的。如果查询 1 失败,查询 2 的状态不受影响;反之亦然。可相对独立地启动和停止查询 1 和查询 2。例如,可以停止查询 1,代之以不同的查询,然后重新启动它。

本主题介绍了在运行时动态撰写查询的若干用例和示例。

重用现有查询的输出

对于多个查询,最常用的用例是需要设计和部署主要查询,该查询预先处理数据并将其发送到某个输出适配器;而其他查询则使用此查询的结果,并将它们自己的结果发送到其他输出适配器下图演示了这种应用场景。

查询 2 使用来自查询 1 的数据。

以下示例表示一个在 StreamInsight 服务器上的现有应用程序 myApp 中创建的查询。

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

要将此查询的结果流式传送到另一个查询,需要使用 Query.ToStream() 方法。与主查询的输出负载匹配的类型被指定为一个通用参数,如下例所示。

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

在本例中,访问了主查询的输出流,并且应用了一个投影运算符以引入名为 Status 的新字段。第二个 ToQuery() 调用不再需要应用程序对象,因为可以从主查询推断该对象。

ToStream() 方法带有可选的 AdvanceTimeSettings 对象(如果必须在此时注入当前时间增量 (CTI))。插入 CTI 可帮助提高特定查询配置的活动性。

请注意,主查询对象是怎样创建的无关紧要。上一个模型演示了使用 CepStream.ToQuery() API 的示例。还可能通过以下方式创建查询:

  • 通过查询绑定器。例如,myApp.CreateQuery("filterQuery", queryBinder, "description");

  • 从服务器通过对象模型 API 检索。例如,myApp.Queries["filterQuery"]

未绑定的查询输出

上例演示如何重用现有查询的结果,其中查询的输出已经绑定到某个输出适配器。查询也可能具有未绑定的输出流,这样只有在一个或多个其他查询使用该查询的结果时才生成输出。下图演示了这种应用场景。

Query 1 具有未绑定的查询流。

这是通过使用不需要适配器的 CepStream.ToQuery() 的重载来完成的:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

可以启动此查询。通过按上例所示为查询 validationQuery 指定此查询,另一个查询可稍后使用此查询的结果流。如果没有任何使用者,主查询的结果将被删除。

此模式还支持您将查询的结果流式传送到多个输出适配器。对于最简单的情况,只需在未绑定查询之上使用传递查询(每个输出适配器一个传递查询)即可实现(上图中的查询 2 和 3)。

发布的流

截至目前,示例中均使用实际查询对象来为另一个查询创建新的输入流。为了使客户端对象抽象化,可以用已发布的流 URI 作为一个或多个其他查询的输入,如下图所示。

使用已发布流作为输出的查询。

每个查询具有默认的已发布流的统一资源标识符 (URI)(即查询名称本身)。此外,可以通过 CepStream 类的适当成员为查询显式分配一个自定义已发布流名称。

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

这将创建一个具有未绑定的但已显式命名的输出的查询。请注意,已发布流的名称必须遵循“<application_name>/PublishedStream/<stream_name>”约定。

现在另一个查询可以引用此 URI(作为其输入流),如下例所示。

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

请注意,已发布流的使用者必须指定输入事件形状,该事件形状必须与引用的查询的输出形状相匹配。

通过已发布流名称连接到主查询不像通过查询对象连接那样紧密。因此,定义次要查询时,必须提供某个应用程序:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

已发布流适配器

检索已撰写查询的适配器(例如,通过 Query.InputStreamBindings)时,您会注意到将使用特殊的内置适配器来连接它们。通过 CepStream.ToQuery、 Query.ToStream() 等撰写查询的功能(如上所示)是这些内置适配器之上的便捷图面。也可以将它们显式用作普通适配器,但它们具有自己的配置结构(其中包含已发布流名称),如下例所示:

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

同样,查询可以使用已发布流输出适配器,该适配器具有与 CepStream.toPublishedStreamQuery() 相同的功能:

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

使用查询绑定器

查询绑定器开发模型允许完全控制各种 StreamInsight 元数据对象,并且将查询的绑定和使用与查询模板设计阶段明确区分开来。此模型还支持动态查询撰写,对输入绑定一端和输出绑定一端均支持。有关详细信息,请参阅使用查询绑定器

绑定到作为输入的其他查询

查询绑定器可以绑定到现有查询,正如它可以将查询模板绑定到作为事件生成器的输入适配器那样。假定像在第一个示例中那样存在一个主查询(具有绑定或未绑定的输出)。

var query = filtered.ToQuery(myApp, ...);

随后,按如下方式使用查询绑定器,引用 BindProducer 的正确重载中的前一个查询。

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

或者,查询绑定器可以将已发布流视作事件生成器。

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

像对 Query.ToStream() 签名那样,可以在 BindProducer() 中指定 AdvanceTimeSettings 对象。

绑定到作为输出的已发布流

在输出一端,查询绑定器允许流式传送到显式定义的已发布流。

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

只要一启动基于此查询绑定器的某个查询,其他查询就可以绑定到以上示例中说明的已发布流,并使用其结果事件。

绑定到已发布流适配器

还可以在查询绑定器模型中使用已发布流适配器。可以从应用程序对象中检索已发布流适配器并用在 BindProducer 和 AddConsumer 中,就像普通适配器那样:

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

请参阅

概念

StreamInsight 端到端示例

将应用程序时间提前