Законченный пример StreamInsight
В этом разделе описываются различные компоненты и действия, связанные с созданием приложения StreamInsight, а также приведен полный пример приложения. В приложении StreamInsight сочетаются источники событий, приемники событий и запросы с целью реализации сценария обработки сложных событий. API-интерфейс StreamInsight предоставляет разнообразные интерфейсы для поддержки различных уровней управления созданием и поддержкой приложений по обработке событий на различных уровнях сложности.
Минимальной единицей развертывания приложения служит запрос, который можно запускать и останавливать. На следующем рисунке показан один из способов построения запроса. Источник события представляется входным адаптером. Адаптер передает поток событий в дерево операторов, которое представляет необходимую логику запросов, указанную конструктором в виде шаблона запроса. Затем поток обработанных событий поступает в приемник событий, которым обычно является выходной адаптер.
Разработчикам, не знакомым с терминологией обработки сложных событий, следует ознакомиться с разделами Основные понятия сервера служб StreamInsight и Архитектура сервера служб StreamInsight.
Процесс приложения
В этом разделе подробно представлены типичные действия по созданию законченного приложения.
Создание экземпляра сервера и приложения
Процесс начинается с создания экземпляра сервера StreamInsight и приложения.
server = Server.Create(”MyInstance”);
Application myApp = server.CreateApplication("MyApp");
Сервер должен создаваться с именем экземпляра, которое зарегистрировано на компьютере в процессе установки StreamInsight (в предыдущем примере это имя MyInstance). Дополнительные сведения см. в разделе Установка (StreamInsight).
Приложение представляет область сервера, содержащую другие сущности метаданных.
В предыдущем примере в том же процессе создается экземпляр сервера. Однако в другом распространенном варианте развертывания выполняется подключение к удаленному серверу, и работа ведется в существующем приложении на этом сервере. В следующем примере показано подключение к удаленному серверу и доступ к существующему приложению.
server = Server.Connect(new System.ServiceModel.EndpointAddress(@"https://localhost/StreamInsight/MyInstance"));
Application myApp = server.Applications["ExistingApp"];
Дополнительные сведения о локальных и удаленных серверах см. в разделе Публикация и соединение с сервером служб StreamInsight.
Создание входного потока
Затем создается входной поток на базе существующей реализации адаптера. В частности, необходимо указать фабрику адаптера, как показано в следующем примере.
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
В результате создается объект CepStream, представляющий поток событий, который создается (после запуска запроса) экземпляром адаптера, созданным по заданному классу фабрики. Поток получает имя, которое затем можно использовать для получения диагностических данных, относящихся к потоку. Кроме того, предоставляется экземпляр структуры конфигурации для фабрики адаптера. В структуре конфигурации передаются данные времени выполнения для фабрики, а также требуемая форма событий (модель событий). Дополнительные сведения об использовании этих параметров в фабрике см. в разделе Создание входных и выходных адаптеров.
Определение запроса
Объект CepStream служит основой для определения фактической логики запроса. В качестве языка спецификации запроса используется LINQ.
var filtered = from e in inputstream
where e.Value > 95
select e;
В этом примере предполагается, что класс или структура с именем MyDataType, определенные в предыдущем примере, чтобы создать объект входного потока, содержат поле с именем Value. Это определение преобразуется в оператор фильтра, который удаляет из потока все события, не удовлетворяющие предикату фильтра where e.Value > 95. Дополнительные сведения об операторах запросов LINQ см. в разделе Написание шаблонов запроса на языке LINQ.
Создание выходного адаптера
На этом этапе переменная filtered еще имеет тип CepStream. Это позволяет преобразовать поток в запрос, который можно запустить. Чтобы создать экземпляр запроса, который можно запустить, необходимо указать выходной адаптер, как показано в следующем примере.
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
Аналогично входному потоку, в выходном адаптере необходимо указать фабрику выходного адаптера, объект конфигурации, требуемую форму выходного потока и временного упорядочения.
Указание формы события гарантирует наличие необходимой формы событий в выходе запроса.
EventShape.Point: время существования всех результирующих событий сводится к точечному событию.
EventShape.Interval: любое результирующее событие обрабатывается как интервальное событие. Таким образом, событие выводится только в случае, если его полное время существования фиксируется событием увеличения текущего времени (CTI).
EventShape.Edge: любое результирующее событие обрабатывается как граничное событие. Это значит, что время начала события выводится как начальная граница, а время окончания как соответствующая конечная граница.
Параметр порядка событий потока влияет на активность выходных потоков интервальных событий. Значение FullyOrdered показывает, что интервальные события всегда выводятся в порядке их времени начала, а значение ChainOrdered создает выходную последовательность, упорядоченную по времени окончания интервала.
Кроме того, в качестве первого параметра необходимо указывать объект приложения, который будет содержать запрос, а также имя и описание запроса, уточняющие определение запроса в хранилище метаданных.
Запуск запроса
Последним шагом является запуск запроса. В этом примере запрос останавливается, когда пользователь нажимает определенную клавишу.
query.Start();
Console.ReadLine();
query.Stop();
В этом законченном примере показано, как использовать неявную привязку источника события к шаблону запроса посредством перегруженных вариантов методов CepStream.Create() и ToQuery(), чтобы быстро создать работающий запрос. Сведения о явном управлении привязкой для объектов обработки сложных событий см. в разделе Использование модуля привязки запроса.
Законченный пример
В следующем примере описанные ранее компоненты объединяются, образуя законченное приложение.
Server server = null;
using (Server server = Server.Create(”MyInstance”))
{
try
{
Application myApp = server.CreateApplication("MyApp");
var inputstream = CepStream<MyDataType>.Create("inputStream",
typeof(MyInputAdapterFactory),
new InputAdapterConfig { someFlag = true },
EventShape.Point);
var filtered = from e in inputstream
where e.Value > 95
select e;
var query = filtered.ToQuery(myApp,
"filterQuery",
"Filter out Values over 95",
typeof(MyOutputAdapterFactory),
new OutputAdapterConfig { someString = "foo" },
EventShape.Point,
StreamEventOrder.FullyOrdered);
query.Start();
Console.ReadLine();
query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}