Поделиться через


Составление запросов во время выполнения

Составление запросов StreamInsight во время выполнения обеспечивает гибкость запросов, возможность повторного использования, повышает эффективность использования ресурсов и упрощает обслуживание. Оно позволяет выполнять следующие действия.

  • Передавать результат одного запроса другим запросам на том же сервере.

  • Получать выходные данные других выполняющихся запросов аналогично получению событий из входного адаптера.

Два составных запроса, например запрос 1, передающий данные в запрос 2, работают изолированно. Если запрос 1 завершается ошибкой, это не отражается на состоянии запроса 2, и наоборот. Запрос 1 и запрос 2 можно запускать и останавливать независимо. Например, можно остановить запрос 1, заменить его другим запросом и запустить снова.

В этом разделе описано несколько вариантов применения и примеры динамического составления запросов во время выполнения.

Повторное использование выходных данных существующего запроса

Распространенным вариантом применения для нескольких запросов является необходимость спроектировать и развернуть основной запрос, который выполняет предварительную обработку данных и отправляет их в выходной адаптер, а остальные запросы получают результат основного запроса и отправляют собственные результаты в другие выходные адаптеры. Этот сценарий показан на следующем рисунке.

В запросе 2 используются данные из запроса 1.

В следующем примере представлен запрос, который создан в существующем приложении myApp на сервере StreamInsight.

    var inputstream = CepStream<MyDataType>.Create("inputStream",
                                                   typeof(MyInputAdapterFactory),
                                                   new InputAdapterConfig { someFlag = true },
                                                   EventShape.Point);

    var filtered = from e in inputstream
                   where e.Value > 95
                   select e;

    var query = filtered.ToQuery(myApp,
                                 "filterQuery",
                                 "Filter out Values over 95",
                                 typeof(MyOutputAdapterFactory),
                                 new OutputAdapterConfig { someString = "foo" },
                                 EventShape.Point,
                                 StreamEventOrder.FullyOrdered);

    query.Start();

Для потоковой передачи результатов этого запроса во второй запрос используется метод Query.ToStream(). В качестве универсального параметра задается тип, который соответствует выходным полезным данным основного запроса, как показано в следующем примере.

var filteredStream = query.ToStream<MyDataType>();

var validated = from e in filteredStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };

var validationQuery = validated.ToQuery("validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

validationQuery.Start();

В этом примере выполняется доступ к потоку выхода основного запроса и применяется оператор проекции (для создания нового поля с именем Status). При втором вызове ToQuery() объект приложения больше не требуется, поскольку его можно получить от основного запроса.

Метод ToStream() принимает необязательный объект AdvanceTimeSettings, если на данном этапе необходимо вставлять события увеличения текущего времени (CTI). Вставка событий CTI повышает актуальность результатов в некоторых конфигурациях запросов.

Учтите, что способ создания объекта основного запроса не имеет значения. В предыдущей модели показан пример использования API CepStream.ToQuery(). Далее представлены другие возможности для создания запроса.

  • Посредством модуля привязки запроса. Например, myApp.CreateQuery("filterQuery", queryBinder, "description");

  • Получение с сервера через API объектной модели. Например, myApp.Queries["filterQuery"]

Непривязанные выходные данные запроса

В предыдущем примере показано повторное использование результата существующего запроса, где выходные данные уже привязаны к выходному адаптеру. В запросах также может применяться непривязанный выходной поток, когда выходные данные не создаются, если результаты запроса не принимаются одним или несколькими другими запросами. Этот сценарий показан на следующем рисунке.

Запрос 1 имеет несвязанный поток запроса.

Это выполняется с помощью перегруженной версии метода CepStream.ToQuery(), для которой не требуется адаптер:

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             EventShape.Point, 
                             StreamEventOrder.FullyOrdered);

Этот запрос можно запускать. Затем второй запрос может получать результирующий поток первого запроса, указав его, как показано в предыдущем примере для запроса validationQuery. Если получатели отсутствуют, то результаты основного запроса удаляются.

Такая схема также позволяет передавать поток результатов запроса в несколько выходных адаптеров. В простейшем случае это выполняется с помощью передаваемых запросов на основе непривязанного запроса, по одному запросу на каждый выходной адаптер (запросы 2 и 3 на приведенном выше рисунке).

Опубликованные потоки

В рассмотренных ранее примерах для создания нового выходного потока для другого запроса использовался фактический объект запроса. Для реализации абстрактных клиентских объектов можно использовать URI опубликованного потока в качестве входных данных для одного или нескольких запросов, как показано на следующем рисунке.

Запросы, использующие опубликованный поток в качестве входных данных.

Каждый запрос имеет URI опубликованного потока по умолчанию, который представляет имя самого запроса. Кроме того, можно явно назначить запросу имя пользовательского опубликованного потока через соответствующий член класса CepStream.

var query = filtered.ToPublishedStreamQuery(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                             myApp,
                                             "filterQuery",
                                             "Filter out Values over 95",
                                             EventShape.Point,
                                             StreamEventOrder.FullyOrdered);

В результате создается запрос, выход которого не привязан, но имеет явно заданное имя. Обратите внимание, что имена опубликованных потоков должны иметь вид «<application_name>/PublishedStream/<stream_name>».

Теперь другой запрос может ссылаться на этот URI в качестве входного потока, как показано в следующем примере.

var filterStream = CepStream<MyDataType>.Create(new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered"),
                                                EventShape.Point);
var validated = from e in filterStream
                ...

Обратите внимание, что получатель опубликованного потока должен указывать форму входных событий, которая должна совпадать с формой выходных событий упоминаемого запроса.

Соединение с основным запросом по имени опубликованного потока выполняется менее строго, чем по объекту запроса. Поэтому при определении дополнительного запроса приложению необходимо передавать следующие параметры:

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Адаптеры опубликованных потоков

При получении адаптеров составленного запроса (например, посредством Query.InputStreamBindings) обратите внимание, что для соединения адаптеров используются специальные встроенные адаптеры. Возможности составления запросов с помощью CepStream.ToQuery, Query.ToStream() и так далее, как показано выше, позволяют удобно работать с запросами на основе этих встроенных адаптеров. Их также можно использовать явно, подобно обычным адаптерам, когда собственная структура конфигурации содержит имя опубликованного потока, как показано в следующем примере.

// primary query, with custom input and output adapters
var inputstream = CepStream<MyDataType>.Create("inputStream",
                                               typeof(MyInputAdapterFactory),
                                               new InputAdapterConfig { someFlag = true },
                                               EventShape.Point);

var filtered = from e in inputstream
               where e.Value > 95
               select e;

var query = filtered.ToQuery(myApp,
                             "filterQuery",
                             "Filter out Values over 95",
                             typeof(MyOutputAdapterFactory),
                             new OutputAdapterConfig { someString = "foo" },
                             EventShape.Point,
                             StreamEventOrder.FullyOrdered);

// secondary query, composed on top of the first one using the
// built-in published stream input adapter and the default published
// stream name of the primary query
var filterStream = CepStream<MyDataType>.Create("filteredStream",
                                                typeof(PublishedStreamAdapterFactory),
                                                new PublishedStreamInputAdapterConfiguration { PublishedStreamName = query.Name },
                                                EventShape.Point);

var validated = from e in filterStream
                ...

var validationQuery = validated.ToQuery(myApp,
                                        "validationQuery",
                                        "Validating Query",
                                        typeof(MyOutputAdapterFactory),
                                        new OutputAdapterConfig { someString = "foo" },
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

Аналогичным образом в запросе можно использовать выходной адаптер опубликованного потока, функции которого аналогичны CepStream.toPublishedStreamQuery():

var filterQuery = filtered.ToQuery(myApp,
                                   "filterQuery",
                                   "desc",
                                   typeof(PublishedStreamAdapterFactory),
                                   new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1") },
                                   EventShape.Point,
                                   StreamEventOrder.FullyOrdered);

Использование модуля привязки запроса

Модель разработки для модуля привязки запроса обеспечивает полный контроль над различными объектами метаданных StreamInsight и явно разграничивает привязку и использование запроса от этапа проектирования шаблона запроса. Эта модель также поддерживает динамическое составление запросов как во входной привязке, так и в выходной привязке. Дополнительные сведения см. в разделе Использование модуля привязки запроса.

Привязка к другому запросу в виде входных данных

Модуль привязки запроса может привязывать шаблон запроса к входному адаптеру в качестве источника событий, а также выполнять привязку к существующему запросу. Предположим, существует основной запрос (с привязанными или непривязанными выходными данными), как в первом примере.

var query = filtered.ToQuery(myApp, ...);

Затем можно использовать модуль привязки запроса следующим образом, ссылаясь на предыдущий запрос в подходящей перегруженной версии BindProducer.

var newStream = CepStream<RawData>.Create("validationInput");
var validated = from e in newStream
                select new
                {
                    SourceId = e.SourceId,
                    Value = e.Value,
                    Status = e.Value > 75 ? false : true
                };
QueryTemplate validateQT = myApp.CreateQueryTemplate("validationLogic", "validates the Value field", validated);
QueryBinder queryBinder = new QueryBinder(validateQT);
queryBinder.BindProducer("validationInput", filterQuery);
queryBinder.AddConsumer(...);

Кроме того, модуль привязки запроса может ссылаться на опубликованный поток в качестве источника событий.

queryBinder.BindProducer("validationInput",
                         new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                         EventShape.Point);

Как в случае с сигнатурой Query.ToStream(), в BindProducer() можно указывать необязательный объект AdvanceTimeSettings.

Привязка к опубликованному потоку в качестве выходных данных

На выходной стороне модуль привязки запроса поддерживает потоковую передачу в явно определенный опубликованный поток.

queryBinder.BindOutputToPublishedStream(new Uri("cep:/Server/Application/MyApp/PublishedStream/ps1"),
                                        EventShape.Point,
                                        StreamEventOrder.FullyOrdered);

После запуска запроса, основанного на этом модуле привязки запроса, другие запросы могут привязывать его к опубликованному потоку, как показано в предыдущих примерах, и получать его результирующие события.

Привязка к адаптерам опубликованных потоков

В модели модуля привязки запроса также можно использовать адаптеры опубликованных потоков. Их можно получать из объекта приложения и использовать в BindProducer и AddConsumer, подобно обычным адаптерам.

queryBinder.BindProducer("validationInput",
                         myApp.GetPublishedStreamInputAdapter(),
                         new PublishedStreamInputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/filtered") },
                         EventShape.Point);
queryBinder.AddConsumer("validated",
                         myApp.GetPublishedStreamOutputAdapter(),
                         new PublishedStreamOutputAdapterConfiguration { PublishedStreamName = new Uri("cep:/Server/Application/MyApp/PublishedStream/validated") },
                         EventShape.Point,
                         StreamEventOrder.FullyOrdered);

См. также

Основные понятия

Законченный пример StreamInsight

Обработка времени приложения