将应用程序时间提前

StreamInsight 开发人员必须在需要可能具有无序数据的数据源和要求以高度灵活地方式处理事件之间进行权衡。尽管更快地将应用程序时间提前将有助于缩短滞后时间,但它将缩小晚到数据的窗口(也就是说,降低数据无序到达的能力)。StreamInsight 提供不同的方法来推断应用程序时间。本主题介绍可在适配器级别设置和使用查询绑定的将应用程序时间提前的不同级别和策略。

理解临时模型

StreamInsight 的临时模型仅基于应用程序时间,从不基于系统时间。这意味着所有临时运算符都会引用事件的时间戳,而永远不会引用主机的系统时钟。因此,应用程序必须将其当前应用程序时间传达给 StreamInsight 服务器。特定应用程序的应用程序时间取决于应用程序环境的许多不同方面。总之,应用程序开发人员负责为 StreamInsight 服务器提供合适的应用程序时间。应用程序时间的主要注意事项如下:

  • 数据源

    在数据源传输临时信息时,这些数据可用于标识已接收的数据源中所有事件的时间点。此时间点构成就此数据源而言的当前应用程序时间。请注意,不同的数据源可能会以不同的速度继续。

  • 无序数据

    对于某些数据源,事件并不始终按其时间戳的顺序到达。也就是说,这些数据是无序的。StreamInsight 可容纳无序数据并且确保结果不依赖于事件到达 StreamInsight 服务器的顺序。StreamInsight 开发人员可提前具有某些间隙的应用程序时间,从而对于具有晚到事件的那些数据源,允许无序事件慢慢到达。

  • 结果活动性

    StreamInsight 查询输出已知最高精确到当前应用程序时间的结果。这意味着在随着应用程序时间一点一点过去而最终得到结果,结果将从 StreamInsight 查询中浮现出来。

当前时间增量 (CTI)

在处理查询的过程中,应用程序时间由当前时间增量 (CTI) 事件驱动。CTI 是一种标点事件,是 StreamInsight 临时模型的中心组件。CTI 用于通过对 StreamInsight 服务器断言时间线的某些部分将不再更改,提交事件序列以及向查询输出释放计算结果。因此,将 CTI 与事件一起排入输入事件流队列以便生成任何结果和刷新有状态运算符的状态十分重要。

通过将 CTI 排入队列,输入可保证不会生成将影响此 CTI 时间戳之前的时段的任何后续事件。这意味着,将 CTI 排入输入队列后:

  • 对于形状为点、间隔或开始边缘的事件:该事件的“开始时间”需在 CTI 时点上或在 CTI 之后。

  • 对于形状为结束边缘的事件:该事件的“结束时间”需在 CTI 时点上或在 CTI 之后。

任何违反这些规则的情况都称为“CTI 冲突”。下文将介绍如何处理这些冲突。

有三种方法可用于将 CTI 插入输入流。

  1. 通过输入适配器以编程方式将 CTI 排入队列,此方法类似于将事件排入队列。

  2. 使用给定频率以声明方式生成 CTI。这可以在适配器工厂中或作为查询绑定的一部分通过 AdvanceTimeGenerationSettings 指定。

  3. 将单独的输入流定义为 CTI 源。这只能在查询绑定中指定。

只要实现方法 2 和 3,也必须实现针对 CTI 冲突的策略。在下面的部分中,将介绍 AdvanceTimeGenerationSettings 和冲突策略。后面的部分将描述如何在适配器工厂以及查询绑定中使用提前时间设置。

CTI 生成

CTI 的生成(见上述方法 2 和 3)具有两个维度:

  1. 生成频率,该频率将指定为正整数 N 或时间范围 T。生成频率策略在出现的事件计数 (N) 或时间范围 (T) 后插入 CTI。

  2. 生成的 CTI 的时间戳,它指定为针对最后接收的事件的延迟。

此外,您可以使用布尔标志以便指示具有正无穷的时间戳的最终 CTI 是否应在关闭查询时插入。这用于从查询的运算符刷新所有剩余事件。

通过类 AdvanceTimeGenerationSettings 定义 CTI 生成,其构造函数采用频率、延迟和标志,如下例中所示。

var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(5), true);

此示例指示引擎在来自事件源的每 10 个事件后插入一个 CTI。该 CTI 携带最后事件的时间减去 5 秒的时间戳。此延迟机制可以高效地实现宽限期,以便事件源不必违反 CTI 语义就可以将晚到事件排入队列(只要这些事件永远没有晚于超过 5 秒)。在相应的查询关闭时,具有不限制时间的 CTI 将排入队列。

请注意,在通过 AdvanceTimeSettings 为 CTI 生成指定频率时,不考虑结束边缘。在使用持续时间作为频率时也不考虑结束边缘。在边缘事件既针对频率又针对持续时间的情况下,只考虑开始边缘。

CTI 冲突策略

可以通过发送具有比插入的 CTI 更早的时间戳的事件,允许事件源违反 CTI 语义。提前时间设置允许指定策略来处理此类情况。该策略可以具有以下两个值:

  • 删除

    违反插入的 CTI 的事件将被删除并且不排入到查询队列中。

  • 调整

    违反插入的 CTI 的事件将被修改(如果其生存期与 CTI 时间戳重叠)。也就是说,这些事件的开始时间戳设置为最近的 CTI 时间戳,因此这些事件将生效。如果事件的开始时间和结束时间都处于 CTI 时间戳之前,则该事件将被删除。

适配器提前时间设置

可以在适配器工厂的定义中指定用于将应用程序时间提前的设置。采用与在实例化适配器时调用工厂的 Create() 方法的相同方式,调用用于定义适配器实例的提前时间设置的相应方法。为此,将接口 ITypedDeclareAdvanceTimeProperties 用于类型化适配器(对于非类型化适配器则为 IDeclareAdvanceTimeProperties),如下例所示。

public class MyInputAdapterFactory : ITypedInputAdapterFactory<MyInputConfig>,
                                     ITypedDeclareAdvanceTimeProperties<MyInputConfig>

此接口要求以下方法以便作为工厂的一部分实现。

public AdapterAdvanceTimeSettings DeclareAdvanceTimeProperties<TPayload>(MyInputConfig configInfo, EventShape eventShape)
{
    var atgs = new AdvanceTimeGenerationSettings(10, TimeSpan.FromSeconds(0), true);
    var ats = new AdapterAdvanceTimeSettings(atgs, AdvanceTimePolicy.Drop);
    return ats;
}

对于每个新实例化的适配器,使用与在相应 Create() 方法调用中指定的相同配置结构和事件形状参数调用方法 DeclareAdvanceTimeProperties()。这允许适配器作者不必要求查询编写器和查询绑定器知道提前时间设置的具体内容,即可从配置信息派生正确的 CTI 生成设置。

AdapterAdvanceTimeSettings 构造函数要求上述 AdvanceTimeGenerationSettings 对象和冲突策略。

查询绑定中的 CTI 生成

与 AdapterAdvanceTimeSettings 相似,可通过声明方式在查询绑定中指定发布 CTI,如下例中所示。这允许用户绑定查询以便定义独立于适配器实现的 CTI 应用程序时间行为。

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

AdvanceTimeSettings 构造函数采用以下三个参数:

  1. AdvanceTimeGenerationSettings 对象

  2. AdvanceTimeImportSettings 对象

  3. 冲突策略

请注意,生成设置或导入设置参数可分别设置为 Null,但不能都设置为 Null。此外,可以一起指定它们。下一部分将介绍 AdvanceTimeImportSettings 类。

上面的示例指定将 CTI 与每个事件一起生成和插入,并且具有事件的时间戳(没有延迟)。AdvanceTimeSettings 对象可作为可选的最后一个参数传递到 CepStream.Create() 方法,如下例所示。

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromSeconds(0), true);
var ats = new AdvanceTimeSettings(atgs, null, AdvanceTimePolicy.Adjust);

var inputstream = CepStream<MyPayloadType>.Create("inputStream",
                                                  typeof(MyInputAdapterFactory),
                                                  new MyConfiguration(),
                                                  EventShape.Point,
                                                  ats);

它还可以用于查询绑定器开发模型:

queryBinder.BindProducer<MyPayloadType>("filterInput",
                                        inputAdapter,
                                        new MyConfiguration(),
                                        EventShape.Point,
                                        ats);

与其他流同步

在查询绑定过程中使用时,除了基于频率生成 CTI(而非替代 CTI),还可以通过使用 AdvanceTimeImportSettings 将它们从其他输入流复制到查询。此功能使两个流可以同步,如下例所示。

var dataStream = CepStream<DataType>.Create("dataStream ",
                                            typeof(DataInputAdapterFactory),
                                            new MyDataAdapterConfiguration(),
                                            EventShape.Point);

var ats = new AdvanceTimeSettings(null, new AdvanceTimeImportSettings("dataStream"), AdvanceTimePolicy.Adjust);

var lookupStream = CepStream<LookupType>.Create("lookupStream",
                                                typeof(ReferenceInputAdapterFactory),
                                                new MyReferenceConfiguration(),
                                                EventShape.Edge,
                                                ats);

var joined = from eLeft in dataStream
             join eRight in lookupStream
             where ...

此示例说明一个典型用例,其中,“快速”数据流必须与“慢速”引用流联接。慢速流可以是更改频率比快速流低得多的查找数据。为了使该联接生成输出的速度与其最快的输入一样快,慢速输入流通过导入其 CTI 与快速流同步。在这个示例中,快速流的应用程序时间处理认为是在适配器中发生的。

结果活动性

提前时间生成设置的延迟参数指定插入的 CTI 的时间戳。理解 CTI 在 StreamInsight 框架中的准确语义对于取得输出活动性的预期效果十分重要。CTI 对引擎断言,严格位于 CTI 的时间戳之前的时间线上的一切内容都将被提交。这会对结果的活动性造成不同影响。

例如,考虑一个点事件的输入流以及频率为 1(每个事件)且延迟为 0 的 CTI 生成设置。这将生成具有每个点事件的完全相同时间戳的 CTI。但是,这意味着非常后的点事件只能与下一个 CTI 一起提交,因为其时间戳并不严格位于相应 CTI 之前。为了在适配器一发布点事件后就尽快提交每个点事件,CTI 必须带有紧接在点事件之后的时间戳。这将转换为一个时钟周期的负延迟,如下例中所示。

var atgs = new AdvanceTimeGenerationSettings(1, TimeSpan.FromTicks(-1), true);

CTI 和查询运算符

CTI 由输入适配器排队或按上述内容注入。它们通过查询传播并且按照具体的运算符以不同方式处理。例如,联接运算符将释放其结果,一直释放到来自任一方的更早的 CTI。联合运算符释放来自任一方的最新 CTI 中更早的结果。整个查询将只将其结果释放到最新的 CTI。

另一方面,某些运算符对 CTI 时间戳有影响。跳跃窗口将窗口内的 CTI 拉回到窗口的开头,因为在事件仍处于该窗口时基于该窗口的操作的结果可能会变化。ShiftEventTime() 和 AlterEventLifeTime() 方法都会更改事件的开始时间,并且相同的转换将应用于 CTI。

请参阅

概念

创建输入和输出适配器

StreamInsight 服务器概念

更改历史记录

更新的内容

添加了“CTI 和查询运算符”部分。

在“CTI 生成”部分中添加了信息,指出在通过 AdvanceTimeSettings 指定 CTI 频率时不考虑结束边缘。