Поделиться через


Создание входных и выходных адаптеров

В этом разделе приведены общие сведения, необходимые при создании входных и выходных адаптеров для приложений обработки сложных событий (CEP) с использованием платформы StreamInsight. Адаптеры являются программными преобразователями, которые передают события на сервер StreamInsight или с сервера StreamInsight.

Основные сведения о потоке событий и управлении им

Во время создания адаптеров важно представлять структуру потока событий через сервер StreamInsight и действия, выполняемые входными и выходными адаптерами для управления этим потоком. Как показано на следующем рисунке, поток событий идет из источника в приемник через постоянный запрос и является однонаправленным. События считываются из источника входным адаптером, который доставляет их в запрос. Входные события или новые события, полученные в результате обработки входных событий, передаются от одного оператора к следующему оператору в запросе. Запрос передает обработанные события в выходной адаптер, который доставляет их в приемник. На рисунке показан сценарий, в котором запрос StreamInsight привязан к двум экземплярам входного адаптера a1 и a2, а также к экземпляру выходного адаптера a4.

Поток событий от входного адаптера к выходному адаптеру

Поток событий идет в одном направлении от источника к приемнику, но управление потоком и выполнением для получения и передачи событий в некоторых точках взаимодействия между компонентами может выполняться в обоих направлениях. Такие точки взаимодействия обозначены на рисунке операциями READ, ENQUEUE, DEQUEUE и WRITE.

В реализации входного адаптера операция READ должна выполняться с использованием механизмов доступа, соответствующих исходному устройству (файлу или базе данных), а операция ENQUEUE должна выполняться с помощью API-интерфейсов адаптера. Аналогично, в реализации выходного адаптера операция WRITE должна выполняться с использованием механизмов доступа, соответствующих приемному устройству, а операция DEQUEUE должна выполняться с помощью API-интерфейсов адаптера. Операции ENQUEUE и DEQUEUE должны реализовываться по шаблону проектирования, указанному на диаграмме переходов между состояниями адаптера, которая описывается далее в этом разделе.

С точки зрения управления потоком событий можно представить, что события принудительно передаются от поставщика к получателю (обозначается стрелками, идущими слева направо) или запрашиваются получателем от поставщика (обозначается изогнутыми стрелками). В точках взаимодействия READ и WRITE реализация адаптера может следовать принципам принудительной отправки или передачи по запросу для управления потоком событий. В этом взаимодействии следует учитывать скорость обработки событий, поддерживаемую источником и приемником, возможность адаптера регулировать скорость источника или приемника, а также средства буферизации, которые можно реализовать.

Для исходных устройств, которые передают события с очень малой задержкой и для которых затруднено регулирование, обычно реализуется такой адаптер, в который исходное устройство принудительно передает события. Примерами таких устройств служат датчики (машинно-создаваемые события), генераторы биржевых котировок и сетевые порты. Для устройств с большей задержкой (файлы, базы данных) рекомендуется реализация, в которой адаптер запрашивает данные из источника. Аналогичным образом, на выходной стороне для устройства, которое может принимать события с очень высокой пропускной способностью, можно реализовать выходной адаптер, принудительно отправляющий события в устройство. Для более медленных выходных устройств можно реализовать схему, в которой устройство запрашивает адаптер каждый раз, когда становится готово к получению событий.

В точке взаимодействия ENQUEUE сервер StreamInsight поддерживает модель принудительной передачи. Это значит, что шаблон проектирования адаптера позволяет ставить в очередь столько событий, сколько может обработать система в каждый момент времени. В точке взаимодействия DEQUEUE сервер StreamInsight поддерживает модель передачи по запросу. Это означает, что в шаблоне проектирования адаптера ожидается передача событий по запросу с сервера на скорости, поддерживаемой обрабатывающей системой.

В данном случае политика регулирования для сервера StreamInsight реализуется очевидным образом. При условии, что используется простой передаваемый запрос без блокирующих операций, скорость получения событий сервером StreamInsight из входного адаптера в точке взаимодействия ENQUEUE ограничивается только скоростью, с которой выходной адаптер может получать события с сервера в точке взаимодействия DEQUEUE. Эффект обратного воздействия, оказываемого сервером StreamInsight на входной адаптер в точке ENQUEUE, определяется скоростью, с которой запрос может освобождать выходные данные, и скоростью, с которой выходной адаптер может принимать эти данные. StreamInsight предлагает обширный набор представлений диагностики, которые помогают измерить скорость событий в каждой из этих точек взаимодействия. Дополнительные сведения см. в разделе Наблюдение за сервером служб StreamInsight и запросами.

Задачи по разработке адаптеров

Для разработки адаптеров применяется следующий контрольный список.

  • Определите нужный тип адаптера (входной или выходной).

    Входной адаптер считывает входящие события в формате, в котором они передаются, и преобразует эти данные в формат, который обрабатывается сервером StreamInsight.

    Выходной адаптер получает события, обработанные сервером StreamInsight, преобразует их в формат, ожидаемый выходным устройством, и передает данные в это устройство.

  • Определение типа событий.

    Для входного адаптера определите тип событий, который описывает полезные данные событий, предоставляемых источником. Для выходного адаптера укажите тип событий, который описывает полезные данные событий, получаемых приемником. Дополнительные сведения о полезных данных событий см. в разделе Основные понятия сервера служб StreamInsight.

    Для источника или приемника, который всегда создает или получает события с фиксированным форматом полезных данных, когда количество полей и их типы известны заранее, следует указать и построить типизированный адаптер. Главным преимуществом типизированного адаптера является относительная простота реализации создания событий для постановки в очередь на сервер StreamInsight. Поскольку типы полей уже известны, для заполнений полей можно использовать технологию IntelliSense в Visual Studio (или эквивалентную функцию в другой интегрированной среде разработки).

    Если источник или приемник создает или получает полезные данные различных форматов, следует указать и построить нетипизированный адаптер. Главным преимуществом нетипизированного адаптера является гибкость указания типа событий во время привязки запроса, а не жесткое задание определенного типа событий в реализации адаптера. По сравнению с типизированным адаптером реализация нетипизированного адаптера требует больших усилий. Нетипизированный входной адаптер необходимо создавать таким образом, чтобы тип каждого поля можно было определить по параметрам конфигурации, которые передаются во время привязки запроса, поочередно заполнить поля, а затем поставить событие в очередь. Аналогично, нетипизированный выходной адаптер должен иметь возможность получать результат обработки запроса из события, выведенного из очереди, в зависимости от данных конфигурации, переданных в выходном потоке.

    Важно заметить, что экземпляр адаптера (типизированного или нетипизированного), привязанный к запросу, всегда передает события, содержащие полезные данные одного определенного типа. Дополнительные сведения см. в разделе Создание типов событий.

  • Определение модели событий.

    Определите модель для входных событий и событий выхода. StreamInsight поддерживает три модели событий: точечную, интервальную и граничную. Если источник передает события с фиксированной моделью, можно создать входной адаптер, предназначенный только для этой модели. Аналогичным образом, если приемнику необходимы события, относящиеся к определенной модели, можно создать выходной адаптер, предназначенный только для этой модели. Однако большинству приложений могут понадобиться все модели событий для определенного типа событий. Рекомендуется построить типизированный или нетипизированный адаптер для каждой модели событий. Дополнительные сведения о моделях событий см. в разделе Основные понятия сервера служб StreamInsight.

    Классы AdapterFactory для входных и выходных адаптеров позволяют упаковать эти адаптеры вместе. Экземпляр нужного адаптера можно создавать во время привязки запроса в зависимости от параметров конфигурации.

  • Выберите соответствующий базовый класс адаптера.

    Выберите подходящий базовый класс адаптера в зависимости от типа события и модели события. Номенклатура классов имеет вид [Typed][Point | Interval | Edge][Input | Output]. Нетипизированные адаптеры не имеют префикса typed.

    Тип адаптера

    Базовый класс входного адаптера

    Базовый класс выходного адаптера

    Типизированный, точечный

    TypedPointInputAdapter

    TypedPointOutputAdapter

    Нетипизированный, точечный

    PointInputAdapter

    PointOutputAdapter

    Типизированный, интервальный

    TypedIntervalInputAdapter

    TypedIntervalOutputAdapter

    Нетипизированный, интервальный

    IntervalInputAdapter

    IntervalOutputAdapter

    Типизированный, граничный

    TypedEdgeInputAdapter

    TypedEdgeOutputAdapter

    Нетипизированный, граничный

    EdgeInputAdapter

    EdgeOutputAdapter

    Дополнительные сведения см. в разделе Microsoft.ComplexEventProcessing.Adapters.

  • Создайте классы AdapterFactory для входного и выходного адаптера.

    Класс AdapterFactory служит контейнером для адаптеров. Необходимо реализовать класс фабрики. Базовые классы фабрики имеют следующую структуру.

    Тип адаптера

    Базовый класс входного адаптера

    Базовый класс выходного адаптера

    Типизированный

    ITypedInputAdapterFactory

    ITypedOutputAdapterFactory

    Нетипизированный

    IInputAdapterFactory

    IOutputAdapterFactory

    Типизированный с поддержкой эластичности

    IHighWaterMarkTypedInputAdapterFactory

    IHighWaterMarkTypedOutputAdapterFactory

    Нетипизированный с поддержкой эластичности

    IHighWaterMarkInputAdapterFactory

    IHighWaterMarkOutputAdapterFactory

    Класс фабрики выполняет следующие функции.

    • Он реализует общий доступ к ресурсам со стороны различных реализаций адаптера для заданного класса устройств (CSV-файл, база данных SQL Server, общий формат журнала веб-сервера) или требования приложения, а также упрощает передачу параметров конфигурации в конструктор адаптера. Например, приложению могут потребоваться все три модели событий (точечная, интервальная и граничная). Одна фабрика может поддерживать три реализации адаптера, по одной на каждую модель событий. Кроме того, приложение может иметь такой же источник события, как и таблица базы данных, однако источник создает несколько структур полезных данных события из одного источника в зависимости от выполняемых запросов. В этом случае одна фабрика может поддерживать реализации адаптера для обработки каждой структуры полезных данных.

    • Он служит шлюзом между адаптером и средой выполнения сервера. Разработчик адаптера должен реализовать методы Create() и Dispose() в фабрике адаптера для класса адаптера. Эти методы вызываются сервером в ходе запуска и завершения работы запроса.

    • Он служит шлюзом адаптера для передачи данных конфигурации до времени выполнения. Это особенно важно для нетипизированных адаптеров, которые должны определять тип каждого поля в структуре по параметрам конфигурации, передаваемым во время привязки запроса. Структуру конфигурации можно определить в классе фабрики, а затем передать с помощью метода Create() в метод конструктора класса адаптера. Эта структура конфигурации сериализуется с помощью DataContractSerialization. Не считая этого ограничения, методика разработки дает полную гибкость в определении данной структуры конфигурации, вопросах ее заполнения и использования в конструкторе адаптера.

    • Он предоставляет способ создания событий увеличения текущего времени (CTI) без явной постановки их в очередь посредством входного адаптера. Реализация интерфейса ITypedDeclareAdvanceTimePolicy (для фабрики типизированного адаптера) и IDeclareAdvanceTimePolicy (для фабрики нетипизированного адаптера) в классе фабрики адаптера позволяет пользователю указывать частоту и отметки времени событий CTI. Это упрощает код адаптера и может распространяться на каждый поток событий, создаваемый фабрикой в экземплярах адаптера. Дополнительные сведения см. в разделе [AdvanceTimeSettingsClass].

    • В эластичных приложениях благодаря этому поддерживается эластичность путем предоставления верхней конечной точки входному адаптеру для воспроизведения недостающих событий, а также предоставления верхней конечной точки и смещения выходному адаптер для устранения повторяющихся событий. Дополнительные сведения см. в разделе Эластичность StreamInsight.

  • Построение и тестирование адаптера.

    Скомпилируйте адаптер и выполните его построение в виде сборки .NET. Проверьте работоспособность основных операций адаптера в простом передаваемом запросе, который считывает события из входного адаптера и передает их в выходной адаптер, не выполняя никакой сложной обработки. Это позволит убедиться, что адаптер выполняет чтение данных из устройств и запись данных в устройства, а также в состоянии ставить события в очередь и выводить их из очереди.

Конечный автомат адаптера

Конечный автомат, определяющий взаимодействие между адаптером и сервером StreamInsight, выглядит одинаково для входных и выходных адаптеров. Это важно, поскольку конечный автомат предоставляет единую модель разработки. Конечный автомат показан на следующем рисунке.

Диаграмма состояния постановки в очередь и вывода из очереди адаптера

Далее представлены основные функции и требования к работе этого конечного автомата.

  • Методы Start() и Resume() вызываются сервером StreamInsight и должны быть реализованы разработчиком адаптера. Кроме того, разработчик должен реализовать метод конструктора для класса адаптера и метод Dispose(), который наследуется из базового класса.

  • Реализация адаптера, в свою очередь, должна вызывать следующие методы, предоставляемые в SDK адаптера.

    • Enqueue() для входного адаптера. Этот метод возвращает значение EnqueueOperationResult.Success или EnqueueOperationResult.Full.

    • Dequeue() для выходного адаптера. Этот метод возвращает значение DequeueOperationResult.Success или DequeueOperationResult.Empty.

    • Ready(). Этот метод возвращает логическое значение TRUE или FALSE.

    • Stopped(). Этот метод возвращает логическое значение TRUE или FALSE.

  • Сервер StreamInsight выполняет асинхронный вызов внутреннего метода (обозначаемого StopQuery()) от имени пользователя, когда администратор или разработчик запроса останавливает выполнение запроса посредством методов из API сервера.

  • Вызовы Enqueue() и Dequeue() возвращают соответственно состояния Full и Empty, когда адаптер находится в одном из следующих состояний.

    • Приостановлен

    • Останавливается

  • Вызовы метода Enqueue() или Dequeue() приводят к созданию исключения, если адаптер находится в одном из следующих состояний.

    • Создан

    • Остановлен

  • Вызовы метода Ready() приводят к созданию исключения, если адаптер находится в одном из следующих состояний:

    • Создан

    • Выполняется

    • Остановлен

  • В процессе своей работы адаптер переходит между некоторыми или всеми из пяти состояний (создан, выполняется, приостановлен, остановка и остановлен). Смена состояние происходит до того, как сервер StreamInsight вызывает метод Start() или Resume(), и после того, как адаптер вызывает методы Enqueue(), Dequeue(), Ready() и Stopped().

  • Сервер StreamInsight и адаптер никогда не используют один и тот же поток. Сервер всегда вызывает метод Start() или Resume() в отдельном рабочем потоке. Сервер получает этот поток из пула потоков операционной системы от имени адаптера. Это означает, что методы Start() и Resume() обладают всеми возможностями и необходимой гибкостью в использовании рабочих потоков по мере необходимости (например, доступно порождение дополнительных потоков для асинхронных операций чтения или записи). Доступ к столь широким возможностям подразумевает необходимость внимательного использования системных ресурсов из этого потока. Желательно следовать рекомендациям.

  • API-интерфейс устраняет необходимость в собственной синхронизации между операциями (потоками) Start() и Resume(). Сервер всегда вызывает метод Resume() после того (и только после того), как адаптер вызывает метод Ready(). Однако нужно учитывать, что синхронизация может понадобиться для задач считывания, записи и буферизации событий, выполняемых с различными устройствами, особенно в сценариях асинхронного ввода-вывода. Рекомендуется использовать неблокируемый ввод-вывод.

  • Если адаптер может бездействовать, то он должен периодически проверять состояние, чтобы определить, получен ли запрос на остановку.

Жизненный цикл взаимодействия адаптера с сервером

Подтверждение связи между сервером StreamInsight и адаптером всегда выполняется синхронно, и адаптер на любом этапе выполнения может проверить состояние и отреагировать требуемым образом. Жизненный цикл взаимодействия адаптера с сервером StreamInsight состоит из следующих операций, которые соответствуют конечному автомату, показанному на предыдущем рисунке.

  • Создание

    Экземпляр адаптера начинает взаимодействие с сервером StreamInsight, когда запрос запускается (путем вызова соответствующего метода в API сервера StreamInsight).

  • Выполнение

    Сервер переводит адаптер в состояние «Выполняется» и асинхронным образом вызывает метод Start() для адаптера. При этом гарантируется, что данный вызов будет выполнен только один раз. Адаптер, находящийся в состоянии «Выполняется», может ставить события в очередь на сервер и выводить их из очереди.

    В идеальном случае адаптер чаще всего будет находиться в состоянии «Выполняется». Рекомендуемый шаблон проектирования предусматривает вызов подпрограммы чтения или записи, предпочтительно в отдельном потоке, из метода Start() и возвращение управления из подпрограммы Start() с быстрым освобождением рабочего потока.

    Подпрограмма чтения (допустим, с именем ProduceEvents()) считывает события из источника и вызывает метод Enqueue() для принудительной отправки событий на сервер. В случае выходного адаптера подпрограмма записи (допустим, с именем ConsumeEvents()) вызывает метод Dequeue() для запроса событий с сервера и записывает их в приемник.

  • Приостановлен

    Если серверу не удается получить событие, стоящее в очереди, или вывести событие из очереди, то входной или выходной адаптер переводится в состояние «Приостановлен». В этом состоянии вызовы методов Enqueue() и Dequeue() возвращают соответственно состояние FULL и EMPTY. В состоянии «Приостановлен» можно реализовать такие служебные операции, как сохранение позиции последней записи, считанной из базы данных, или строки в файле. В конце этого необязательного раздела необходимо вызвать метод Ready(), чтобы сообщить серверу о готовности адаптера к возобновлению. Если подпрограмма выполняется в одном рабочем потоке с подпрограммой Start(), необходимо возвращать управление из Start().

  • В ответ на вызов метода Ready() сервер возвращает адаптер в состояние «Выполняется» и всегда вызывает метод Resume() асинхронно в другом рабочем потоке. Метод Resume() можно создать так, чтобы ставить в очередь или выводить из очереди последнюю итерацию с ошибкой, а затем вызывать ProduceEvents() или ConsumeEvents(). Этот цикл может продолжаться, пока адаптер не перейдет в состояние «Остановлен» или «Остановка».

  • Остановка

    Если адаптер находится в состоянии «Выполняется» или «Приостановлен», то сервер в любой момент может перевести его в состояние «Остановка» в ответ на асинхронное требование остановить запрос. В этом состоянии вызов метода Enqueue() или Dequeue() также возвращает соответственно состояние FULL или EMPTY.

    Состояние «Остановка» представляет реализацию промежуточной области хранения адаптера для подготовки к верной остановке. Адаптер можно реализовать так, чтобы освобождались все полученные ресурсы (потоки, память), затем вызывался метод Stopped(). До вызова этого метода сервер не будет останавливать адаптер.

    Заметьте, что адаптер может переходить в состояние «Остановка» в асинхронном режиме. Адаптер должен иметь возможность обнаружить, что он перешел в состояние «Остановка». Как упоминалось ранее, шаблон проектирования предусматривает вызов метода Ready() для приостановленного адаптера. В ответ на это сервер снова вызывает метод Resume(), что позволяет обнаружить состояние «Остановка» в методе Resume(). Рекомендуется размещать проверку состояния «Остановка» первым блоком кода в реализации Start() и Resume().

  • Остановлен

    Метод Stopped() может вызываться из любого места в коде адаптера. В результате адаптер переходит в состояние «остановлен». С точки зрения оптимального проектирования рекомендуется освобождать ресурсы, полученные адаптером, перед вызовом метода Stopped().

    Важное примечаниеВажно!

    Если не вызвать метод Stopped(), то последняя страница памяти, связанная с запросом, не будет освобождена. В результате могут возникать небольшие утечки памяти, которые могут накапливаться со временем, если в процессе выполняется большое количество циклов запуска и остановки запроса.

    В состоянии «Остановлен» адаптер не может ссылаться на конструкции, относящиеся к серверу StreamInsight, или на память событий, а также не может выполнять операции постановки в очередь и вывода из очереди. Подобные действия вызывают исключение. Однако в этом состоянии могут продолжаться операции очистки, работающие с операционной системой и с устройствами.

Примеры

Примеры различных входных и выходных адаптеров, а также фабрик адаптеров см. в образцах, доступных на странице StreamInsight Samples (на английском языке).

См. также

Основные понятия

Основные понятия сервера служб StreamInsight

Архитектура сервера служб StreamInsight