Поделиться через


Использование модуля привязки запроса

Модуль привязки запроса — это клиентская модель разработки, которая предоставляет максимальный уровень гибкости и возможность повторного использования при создании приложений StreamInsight. В этой модели адаптеры и шаблоны запросов регистрируются как отдельные объекты метаданных, которые затем можно привязать друг к другу для создания экземпляра запроса. Таким образом, разработчик получает полный контроль над приложением и над средой разработки за счет применения явной привязки запроса на базе API объектной модели.

К типичным случаям применения явной серверной модели разработки относятся приложения StreamInsight, которым необходимы следующие условия.

  • Полный контроль над сервером StreamInsight и полный доступ к серверу.

  • Многократное использование запросов путем составления статических или динамических запросов, а также многократное использование адаптеров, типов событий и шаблонов запросов, определенных сторонними разработчиками.

Ключевые характеристики модели разработки для модуля привязки запроса

Модель модуля привязки запроса обладает следующими ключевыми характеристиками.

  • Разработчик должен явно создавать все объекты метаданных и регистрировать их на сервере StreamInsight.

  • Модель поддерживает создание и использование нескольких объектов (шаблонов запросов, запросов, приложений и адаптеров). Все объекты должны регистрироваться в приложении.

    Шаблон запроса и экземпляр запроса должны быть явно зарегистрированы на сервере перед выполнением запроса. Входные и выходные адаптеры должны явно регистрироваться, чтобы на эти объекты мог ссылаться шаблон запроса или запрос. Более того, все объекты должны регистрироваться в приложении. Типы событий, используемые адаптерами и шаблонами запросов, регистрируются неявно.

Примеры

В следующем примере создается объект сервера StreamInsight, а на сервере создается объект приложения с именем myApp. Затем создаются и регистрируются все объекты StreamInsight, необходимые для импорта, обработки и экспорта потоков событий.

Сначала создаются объект сервера и объект приложения.

server = Server.Create(“MyInstance”);
Application myApp = server.CreateApplication("MyApp");

Затем входной и выходной адаптеры регистрируются в приложении.

InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

Шаблон запроса задается на основе непривязанного потока. Единственным необходимым параметром для создания непривязанного потока является имя потока, которое затем требуется для привязки адаптера.

var inputstream = CepStream<MyDataType>.Create("filterInput");

var filtered = from e in inputstream
               where e.Value > 95
               select e;

QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", filtered);
  • Последний вызов регистрирует шаблон запроса в приложении. Затем зарегистрированный шаблон запроса можно использовать в нескольких привязках и таким образом создавать его экземпляры в нескольких запросах, которые в общем случае могут привязываться к различным входным и выходным адаптерам. Такие привязки для зарегистрированных шаблонов запроса определяются посредством объекта QueryBinder:
QueryBinder queryBinder = new QueryBinder(filterQT);

queryBinder.BindProducer<MyDataType>("filterInput",
                                      inputAdapter,
                                      new InputAdapterConfig { someFlag = true },
                                      EventShape.Point);

queryBinder.AddConsumer("filterOutput",
                         outputAdapter,
                         new OutputAdapterConfig { someString = "foo" },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

Метод BindProducer() привязывает объект входного адаптера (который должен быть зарегистрирован в приложении) к потоку с указанным именем, в данном случае — filterInput. Это позволяет отличать друг от друга несколько точек входа в шаблоне запроса. Вместе с входным адаптером требуются параметры, относящиеся к привязке (конфигурация адаптера и нужная форма событий).

Метод AddConsumer() привязывает объект выходного адаптера (который должен быть зарегистрирован в приложении) к одиночному исходящему потоку шаблона запроса. Предоставляемое имя выходного потока (в данном случае validated) служит для идентификации потока в целях диагностики. Как и для входного адаптера, для выходного адаптера задаются параметры, относящиеся к привязке.

Объект запроса создается на основе модуля привязки запроса, идентификатора запроса и текстового описания. Последним шагом является запуск запроса.

query.Start();

Запросы с несколькими входными потоками

В следующем примере показано, как создать шаблон запроса, использующий несколько входных потоков. Шаблон запроса может иметь несколько точек входа, каждая из которых получает данные из отдельного источника данных (например, когда необходимо соединить два потока). Правильное сопоставление потоков выполняется за счет указания имени потока, как показано в следующем примере.

CepStream<SensorReading> sensorStream = CepStream<SensorReading>.Create("sensorInput");
CepStream<LocationData> locationStream = CepStream<LocationData>.Create("locationInput");

// Define query template in LINQ on top of sensorStream and locationStream
// ...
// Create query binder like in the previous example
// ...

InputAdapter inputAdapter = application.CreateInputAdapter<TextFileInputFactory>("CSVInput", "Reading tuples from a CSV file");

qb.BindProducer<SensorReading>("sensorInput", inputAdapter, sensorInputConf, EventShape.Interval);
qb.BindProducer<LocationData>("locationInput", inputAdapter, locationInputConf, EventShape.Edge);

Изменение существующего приложения

Обратите внимание, что при работе с шаблоном запроса и объектами адаптера в модели модуля привязки запроса не обязательно создавать их в одном приложении. В следующем примере предполагается наличие соединения с существующим сервером и сущности метаданных не создаются, а получаются с сервера через API объектной модели StreamInsight.

Application myApp = server.Applications["app1"];
QueryTemplate myQueryTemplate = myApp.QueryTemplates["qt1"];
InputAdapter myInputAdapter = myApp.InputAdapters["sensorAdapter5"];

Использование сохраняемого хранилища метаданных

При создании сервера StreamInsight в необязательном параметре метода Server.Create() указывается используемый тип хранилища метаданных. По умолчанию метаданные хранятся в памяти. Также можно указать хранение метаданных на диске в базе данных SQL Server Compact 3.5. В следующем примере показано, как указать базу данных SQL Server Compact 3.5 в качестве хранилища метаданных.

SqlCeMetadataProviderConfiguration metadataConfiguration = new SqlCeMetadataProviderConfiguration();
metadataConfiguration.DataSource = "SIMetadata.sdf";
metadataConfiguration.CreateDataSourceIfMissing = streamHostConfig.CreateDataSourceIfMissing;

server = Server.Create(”MyInstance”, metadataConfiguration);
Application myApp = server.CreateApplication("MyApp");

Учтите, что в случае указания существующей базы данных метаданных при создании сервера будут считаны все метаданные из указанного файла. Затем сущности метаданных будет возможно получить через API объектной модели StreamInsight.

Законченный пример

using (Server server = Server.Create("MyInstance"))
{
try
{
    Application myApp = server.CreateApplication("MyApp");
    InputAdapter inputAdapter = myApp.CreateInputAdapter<MyInputAdapterFactory>("DataSource", "Description of the input adapter");
    OutputAdapter outputAdapter = myApp.CreateOutputAdapter<MyOutputAdapterFactory>("Output", " Description of the output adapter ");

    var inputstream = CepStream<MyDataType>.Create("filterInput");

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    QueryTemplate filterQT = myApp.CreateQueryTemplate("filterLogic", "Description of the query template", filtered);
    QueryBinder queryBinder = new QueryBinder(filterQT);

    queryBinder.BindProducer<MyDataType>("filterInput",
                                         inputAdapter,
                                         new InputAdapterConfig { someFlag = true },
                                         EventShape.Point);

    queryBinder.AddConsumer("filterOutput",
                                                 outputAdapter,
                                                 new OutputAdapterConfig { someString = "foo" },
                                                 EventShape.Point,
                                                 StreamEventOrder.FullyOrdered);

    Query query = myApp.CreateQuery("filterQuery", "My Filter Query", queryBinder);

    query.Start();
    Console.ReadLine();
    query.Stop();
}
catch (Exception e)
{
    Console.WriteLine(e.ToString());
}
}

См. также

Основные понятия

Основные понятия сервера служб StreamInsight