使用查询绑定器

“查询绑定器”是一个客户端开发模型,当创建 StreamInsight 应用程序时可提供最大程度的灵活性和可重用性。在此模型中,适配器和查询模板注册为单独的元数据对象,这些对象随后可绑定在一起以便实例化查询。这使得开发人员能够通过在对象模型 API 的顶部使用显式查询绑定来完全控制其应用和开发环境。

显式服务器开发模型通常的使用案例包括具有以下要求的 StreamInsight 应用程序:

  • 完全控制和访问 StreamInsight 服务器。

  • 通过静态或动态查询组合重复使用查询,或重复使用由第三方定义的适配器、事件类型和查询模板。

查询绑定器开发模型的主要特征

查询绑定器模型具有以下主要特征:

  • 开发人员必须显式创建所有元数据对象并将其注册到 StreamInsight 服务器中。

  • 此模型支持创建和使用多个对象(查询模板、查询、应用程序和适配器)。所有对象都必须注册在一个应用程序下。

    查询模板和查询实例必须显式注册到服务器后,查询才能运行。输入和输出适配器必须显式注册,查询模板或查询才能引用这些对象。另外,所有对象必须注册在一个应用程序下。由适配器和查询模板使用的事件类型将隐式注册。

示例

下面的示例在服务器上创建一个 StreamInsight 服务器对象和一个名为 myApp 的应用程序对象。然后,它创建和注册导入、处理和导出事件流所需的所有必要的 StreamInsight 对象。

首先,创建服务器和应用程序对象。

server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");

其次,在应用程序中注册输入和输出适配器。

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

查询模板在未绑定流的顶部指定。创建未绑定流的唯一必需的参数为流名称,在后续进行适配器绑定时需要流名称。

var inputstream = CepStream<MyDataType>.Create("filterInput");

var filtered = from e in inputstream
               where e.Value > 95
               select e;

QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
  • 最后一个调用在应用程序中注册查询模板。已注册的查询模板现在可以用在多个绑定中,并因此在多个查询中实例化,而每个查询均绑定到可能不同的输入和输出适配器。已注册查询模板的这些绑定通过 QueryBinder 对象进行定义:
QueryBinder queryBinder = new QueryBinder(filterQT);

queryBinder.BindProducer<MyDataType>("filterInput",
                                      inputAdapter,
                                      new InputAdapterConfig { someFlag = true },
                                      EventShape.Point);

queryBinder.AddConsumer("filterOutput",
                         outputAdapter,
                         new OutputAdapterConfig { someString = "foo" },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

BindProducer() 方法使用指定的名称(此处为“filterInput”)将输入适配器对象(必须在应用程序中注册)绑定到流。这样,您就可以区分查询模板的多个入口点。连同输入适配器一起,绑定特定的参数(适配器配置和所需的事件形状)是必需的。

AddConsumer() 方法将输出适配器对象(必须在应用程序中注册)绑定到查询模板的单个传出流。可以使用所提供的输出流名称(此处为“validated”)来标识该流以进行诊断。连同输入适配器一起,为输出适配器提供了绑定特定的参数。

查询对象基于查询绑定器、查询标识符和文本说明而创建。最后一步是启动查询。

query.Start();

具有多个输入流的查询

下面的示例显示了如何创建使用多个输入流的查询模板。查询模板可以具有多个入口点,每个入口点源自不同的数据源,例如,当必须联接两个流时。通过指定流名称发生适当的流关联,如下例中所示。

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...

InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

修改现有应用程序

请注意,您使用的是具有查询模板和适配器对象的查询绑定器模型,而不必已在同一个应用程序中创建了它们。下面的示例假定与现有服务器建立了连接,并通过 StreamInsight 对象模型 API 检索现有元数据实体,而不是创建它们。

Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];

使用持久化的元数据存储区

当创建 StreamInsight 服务器时,Server.Create() 方法的一个可选参数是要使用的元数据存储区的类型。默认情况下,元数据存储在内存中。此外,元数据也可以通过 SQL Server Compact 3.5 数据库持久保留在磁盘上。下面的示例演示如何将 SQL Server Compact 3.5 数据库指定为元数据存储区。

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;

server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");

请注意,当创建服务器时指定现有元数据数据库将从指定的文件中读取所有元数据。然后,可以通过 StreamInsight 对象模型 API 检索元数据实体。

完整示例

using (Server server = Server.Create("MyInstance"))
{
try
{
    Application myApp = server.CreateApplication("MyApp");
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

    var inputstream = CepStream<MyDataType>.Create("filterInput");

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
    QueryBinder queryBinder = new QueryBinder(filterQT);

    queryBinder.BindProducer<MyDataType>("filterInput",
                                         inputAdapter,
                                         new InputAdapterConfig { someFlag = true },
                                         EventShape.Point);

    queryBinder.AddConsumer("filterOutput",
                                                 outputAdapter,
                                                 new OutputAdapterConfig { someString = "foo" },
                                                 EventShape.Point,
                                                 StreamEventOrder.FullyOrdered);

    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

    query.Start();
    Console.ReadLine();
    query.Stop();
}
catch (Exception e)
{
    Console.WriteLine(e.ToString());
}
}

请参阅

概念

StreamInsight 服务器概念