Поделиться через


Обработка времени приложения

Разработчики StreamInsight должны найти баланс между потребностями источников данных, которые могут содержать неупорядоченные данные, и требованиями к оперативной обработке событий. С одной стороны, более быстрая обработка времени приложения помогает сократить задержку, с другой, уменьшается запас времени для запаздывающих данных (повышается вероятность потери этих данных). StreamInsight предоставляет различные способы для обработки времени приложения. В этом разделе описываются различные уровни и политики обработки времени приложения, которые можно задавать на уровне адаптера и с привязками запроса.

Основные сведения о временной модели

Временная модель StreamInsight основана только на времени приложения и не учитывает системное время. Это значит, что все временные операторы ссылаются на отметки времени событий и не работают с системными часами компьютера. В результате приложения должны передавать свое текущее время на сервер StreamInsight. Время для заданного приложения зависит от многих аспектов в контексте приложения. В конечном счете за передачу правильного времени приложения на сервер StreamInsight отвечает разработчик приложения. Далее приведены основные моменты работы со временем приложения.

  • Источники данных

    Когда источники данных передают сведения о времени, по этим данным можно определить момент времени, когда получены все события из источника данных. Данный момент времени представляет текущее время приложения относительно этого источника данных. Обратите внимание, что различные источники данных могут работать с различной скоростью.

  • Неупорядоченные данные

    События из некоторых источников данных не всегда поступают в порядке, указанном отметками времени. Таким образом, данные оказываются неупорядоченными. StreamInsight поддерживает обработку неупорядоченных данных и гарантирует, что результаты не будут зависеть от порядка поступления событий на сервер StreamInsight. Разработчики StreamInsight могут обрабатывать время приложения с небольшой задержкой, чтобы неупорядоченные события из источников данных с запаздывающими событиями успели прибыть.

  • Актуальность результатов

    Запросы StreamInsight выводят результаты, достоверные с точностью до текущего времени приложения. Это значит, что результаты передаются из запросов StreamInsight по их завершении в процессе работы общего приложения.

Увеличение текущего времени (CTI)

Во время обработки запросов время приложения управляется событиями увеличения текущего времени (CTI). Специальное событие CTI является центральным компонентом временной модели StreamInsight. События CTI служат для фиксации последовательностей событий и вывода вычисленных результатов в выходные данные запроса. Они гарантируют серверу StreamInsight, что определенные части временной шкалы больше не будут изменяться. Поэтому важно ставить события CTI в очередь потока входных событий вместе с обычными событиями, чтобы получить результаты и записать на диск состояние операторов, сохраняющих состояние.

Если событие CTI помещено в очередь, последующие события, которые могли бы повлиять на период перед отметкой времени CTI, создаваться не будут. Это означает, что после помещения CTI в очередь потока входящих событий:

  • Для начала точечных, интервальных или граничных событий: Время начала события должно совпадать со временем CTI или следовать за ним.

  • Для окончания граничных событий: Время окончания события должно совпадать со временем CTI или следовать за ним.

Если эти правила не выполняются, говорят о нарушении CTI. Ниже описаны способы обработки этих нарушений.

События CTI вставляются во входной поток тремя способами.

  1. Программная постановка событий CTI в очередь через входной адаптер, аналогично постановке обычных событий в очередь.

  2. Декларативное создание событий CTI с заданной частотой. Задается в объекте AdvanceTimeGenerationSettings фабрики адаптера или в составе привязки запроса.

  3. Определение отдельного входного потока в качестве источника CTI. Может задаваться только в привязке запроса.

Если реализуются способы 2 и 3, то также необходимо реализовать политику для нарушений CTI. В следующем разделе описан класс AdvanceTimeGenerationSettings и политики нарушений. В последующих разделах описывается использование параметров обработки времени в фабрике адаптера и в привязке запроса.

Создание CTI

Создание событий CTI (описанное ранее в способах 2 и 3) ведется по двум измерениям.

  1. Частота создания, которая указывается положительным целым числом N либо в виде интервала времени T. Политика частоты создания вставляет событие CTI каждые N событий или через интервал времени T.

  2. Отметка времени создаваемых событий CTI, которая задается в виде задержки относительно последнего полученного события.

Кроме того, можно использовать логический флаг, который показывает, должно ли вставляться конечное событие CTI с отметкой времени «плюс бесконечность», когда запрос завершает работу. Этот флаг используется для записи на диск всех оставшихся событий из операторов запроса.

Создание CTI определяется в классе AdvanceTimeGenerationSettings, конструктор которого принимает частоту, задержку и флаг, как показано в следующем примере.

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

В этом примере системе сообщается, что событие CTI должно вставляться через каждые 10 событий, поступающих из источника события. CTI имеет отметку времени последнего события минус пять секунд. Такой механизм задержки фактически реализует отсрочку, позволяющую источнику события ставить в очередь запаздывающие события, не нарушая семантику CTI (при условии, что запаздывание событий не превышает пять секунд). Когда соответствующий запрос завершает работу, в очередь будет поставлено событие CTI с бесконечным временем.

Обратите внимание, что при настройке частоты создания CTI с помощью объекта AdvanceTimeSettings конечные границы не учитываются. Они также не учитываются, если вместо частоты используется длительность. Начальные границы учитываются, только если они являются граничными событиями как для частоты, так и для длительности.

Политики нарушения CTI

Источник события может нарушать семантику CTI, отправляя события с отметкой времени раньше, чем у вставленных CTI. Параметры обработки времени позволяют задавать политику для обработки таких ситуаций. Политика может иметь одно из двух значений:

  • Drop

    События, нарушающие вставленные CTI, удаляются и не ставятся в очередь запроса.

  • Исправление

    События, которые нарушают вставленные CTI, изменяются, если их время существования перекрывается с отметкой времени CTI. Это значит, что отметка времени начала событий устанавливается в значение отметки времени последнего CTI, для которого данные события будут допустимыми. Если и время начала, и время окончания события расположены до отметки времени CTI, то такое событие удаляется.

Параметры обработки времени в адаптере

Параметры обработки времени приложения можно указать в определении фабрики адаптера. Аналогично тому, как при каждом создании экземпляра адаптера вызывается метод Create() фабрики, вызывается соответствующий метод для определения параметров обработки времени в экземпляре адаптера. Для этого используется интерфейс ITypedDeclareAdvanceTimeProperties для типизированного адаптера (или интерфейс IDeclareAdvanceTimeProperties для нетипизированного адаптера), как показано в следующем примере.

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

Для этого интерфейса необходимо, чтобы в составе фабрики был реализован следующий метод.

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

Метод DeclareAdvanceTimeProperties() вызывается для каждого нового создаваемого экземпляра адаптера с той же структурой конфигурации и параметром формы события, которые указаны в соответствующем вызове метода Create(). Это позволяет автору адаптера получать правильные параметры создания CTI из сведений конфигурации, а пользователи, выполняющие создание и привязку запроса, не должны учитывать параметры обработки времени.

Для конструктора AdapterAdvanceTimeSettings требуется объект AdvanceTimeGenerationSettings и политика нарушений, описанные ранее.

Создание CTI в привязке запроса

Аналогично AdapterAdvanceTimeSettings, создание CTI можно декларативно задать в привязке запроса, как показано в следующем примере. Это позволяет пользователю, выполняющему привязку запроса, определять порядок обработки времени приложения CTI независимо от реализации адаптера.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

Конструктор AdvanceTimeSettings принимает следующие три аргумента:

  1. объект AdvanceTimeGenerationSettings;

  2. объект AdvanceTimeImportSettings;

  3. политику нарушения.

Учтите, что один из аргументов с параметрами создания или параметрами импорта может иметь значение NULL, но не оба аргумента одновременно. Кроме того, эти аргументы можно задавать вместе. В следующем разделе описывается класс AdvanceTimeImportSettings.

В приведенном выше примере указывается, что CTI создается и вставляется с каждым событием и имеет отметку времени, совпадающую с отметкой времени события (без задержки). Объект AdvanceTimeSettings можно передавать в качестве последнего необязательного параметра в метод CepStream.Create() , как показано в следующем примере."Пример:"

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

Его также можно использовать в модели разработки для модуля привязки запроса.

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

Синхронизация с другим потоком

Если события CTI используются во время привязки запроса, то, помимо создания их с определенной частотой (или вместо этого), их можно копировать в запрос из другого входного потока с помощью AdvanceTimeImportSettings. Такая возможность позволяет синхронизировать два потока, как показано в следующем примере.

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

В этом примере показан типичный случай использования, где «быстрый» поток данных необходимо соединить с «медленным» эталонным потоком. Медленный поток может представлять данные для подстановки, которые меняются намного реже, чем быстрый поток. Чтобы сделать результат соединения таким же быстрым, как самый быстрый из входов соединения, медленный входной поток синхронизируется с быстрым потоком, импортируя его события CTI. В этом примере предполагается, что обработка времени приложения в быстром потоке выполняется в адаптере.

Актуальность результатов

Параметр задержки в параметрах обработки времени указывает отметку времени вставляемых событий CTI. Чтобы добиться необходимого уровня актуальности выходных данных, важно полностью представлять семантику событий CTI на платформе StreamInsight. Событие CTI гарантирует подсистеме, что все события на временной шкале, расположенные строго раньше отметки времени CTI, уже зафиксированы. Это имеет различные последствия для актуальности результатов.

Например, пусть имеется входной поток точечных событий и задано создание CTI с частотой 1 (каждое событие) и задержкой 0. В результате создаются CTI, имеющие отметки времени, совпадающие с каждым точечным событием. Однако в этом случае последнее точечное событие будет фиксироваться только со следующим CTI, поскольку его отметка времени не находится строго раньше соответствующего CTI. Чтобы каждое точечное событие фиксировалось сразу после выхода из адаптера, события CTI должны иметь отметки времени, расположенные непосредственно за отметкой времени точечных событий. В результате формируется отрицательная задержка в один такт, как показано в следующем примере.

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

События CTI и операторы запросов

События CTI помещаются в очередь входным адаптером или вставляются способом, описанным выше. Они распространяются с помощью запроса и обрабатываются другим способом специальными операторами. Например, операторы соединения выдают свои результаты вплоть до старых событий CTI с любой стороны. Операторы Union выдают самый старый результат из последних событий CTI с любой стороны. Весь запрос выдаст результат только вплоть до последнего события CTI.

С другой стороны, некоторые операторы влияют на отметки времени события CTI. «'Прыгающие» окна смещают события CTI внутри окна к началу окна, поскольку результат операции в верхней части окна может измениться, пока события продолжают поступать в это окно. Методы ShiftEventTime() и AlterEventLifeTime() изменяют время начала событий, и такое же преобразование будет применяться к событиям CTI.

См. также

Основные понятия

Создание входных и выходных адаптеров

Основные понятия сервера служб StreamInsight

Журнал изменений

Обновленное содержимое

Добавлен раздел «События CTI и операторы запросов».

В раздел «Создание событий CTI» добавлены сведения о том, что конечные границы не учитываются при задании частоты CTI с помощью объекта AdvanceTimeSettings.