用户定义的流运算符

通过用户定义的流运算符,您可以定义事件流的自定义处理。

使用模式

在查询内,可通过使用 CepStreamScan 扩展方法,调用用户定义的流运算符。您为该运算符提供输入流和初始状态,如下例中所示。

var query = input.Scan(new SmoothingOperator(0.7));

运算符的作者从抽象 CepPointStreamOperatorCepEdgeStreamOperator 类型派生一个新类。这个新类将封装该运算符的状态机。将对此类型构造函数的调用传递给扫描运算符,以便为该运算符建立初始状态。

用户定义的流运算符的特性

通过用户定义的流运算符,用户可以过程方式与事件流交互。因此,该运算符表示 StreamInsight 事件处理引擎和 CLR 之间的边界,与适配器和用户定义的运算符或聚合相似。在上述所有情况下,引擎以及开发人员都在履行与流的临时属性有关的约定。StreamInsight 引擎确保用户定义的流运算符满足以下条件,以便对于特定序列的查询输入事件其行为是确定性的:

  • 用户定义的流运算符必须接收按其同步时间(针对点和开始边缘事件的开始时间,针对结束边缘事件的结束时间)排序的事件。不支持间隔事件,因为间隔事件不具有直接的按同步时间排序的表示形式,例如每个事件都指示两个时间点 – 一个开始时间点和一个结束时间点。

  • 只有插入事件将传递到用户定义的流运算符。传入流中的当前时间增量事件 (CTI) 对于用户定义的流运算符而言是透明的,但它们仍确定用户定义的流运算符如何觉察时间的推移(请参阅下文的 NextCti)。

  • 根据是否允许,用户定义的流运算符可由 StreamInsight 停用(请参阅下文的 IsEmpty)。已停用的用户定义的流运算符可由 StreamInsight 回收。

  • 每个插入事件都将导致调用 ProcessEvent,之后将轮询 NextCti 和 IsEmpty 属性。

用户定义的流运算符的输入和输出

用户定义的流运算符一次处理一个输入事件。为响应每个输入事件,它可以生成 0-* 个输出事件。为响应输入,该运算符还可能会更新其内部状态。输入事件可以是 CTI(在请求运算符时生成以便指示时间的推移)或插入。会暂时给输入加标注。

与输入事件相反,输出事件只不过是事件负载。无法给输出事件加时间戳或将 CTI 注入到输出流中。输出事件作为点事件生成,其时间戳基于相应输入事件的时间戳。

在用户定义的流运算符中处理时间

在您创建新的用户定义的流运算符时,您的代码必须仅处理事件的负载。时间由 StreamInsight 专门处理。输入事件是按顺序接收的。每个输出事件的时间戳都基于相应输入事件的时间戳。例如,如果某一边缘结束事件触发一个输出事件,然后该输出事件将接收该边缘结束事件的时间戳。因此,该运算符可受到时间的影响,但无法对它进行控制。

用户定义的流运算符不会从其 ProcessEvent() 方法中的输入流直接接收 CTI,但它能够对通过 NextCti 属性对时间的推移作出反应。此属性在每次调用 ProcessEvent() 后由引擎轮询。用户定义的流运算符可返回一个时间戳,该时间戳指示它将作为对 ProcessEvent() 的调用接收的下一个 CTI 时间戳。

只有已通过设置 NextCti 属性请求的那些 CTI 才将传递到 ProcessEvent。这些 CTI 将不会传播到用户定义的流运算符之外。

实现用户定义的流运算符

若要创建新的用户定义的流运算符,请从抽象 CepPointStreamOperatorCepEdgeStreamOperator 基类派生一个新类。

  • 如果您从抽象 CepPointStreamOperator 基类派生,则运算符会将输入事件视作点事件。但是,如果事件事实上并非点事件,这也不算是个错误。运算符将仅看到其开始时间。

  • 如果您从抽象 CepEdgeStreamOperator 基类派生,则运算符将会看到输入事件的开始和结束边缘。

在您的派生类中,您将覆盖以下属性和方法:

  • ProcessEvent 方法。生成输出并更新运算符的内部状态,以便响应各输入事件。 ProcessEvent 接收一个输入事件并且可以返回零个或多个输出负载。

  • IsEmpty 属性。指示运算符的内部状态是否为空。在为 true 时,StreamInsight 查询引擎可能会放弃运算符实例以降低内存使用率。

  • 或者,将采用 NextCti 方法。指示 CTI 事件将发送到运算符的下一个时间点。通过覆盖此属性,可以让用户定义的运算符在将来的特定时间点生成输出,或者在经过某个应用程序时间间隔后指示其内部状态为空。

派生类也必须实现 WCF 序列化。有关详细信息,请参阅如何为类或结构创建基本数据约定

StreamInsight 引擎是如何与运算符交互的

对于该运算符的每个实例,通过按同步时间顺序的事件调用 ProcessEvent 方法。对于点事件或 CTI,该同步时间是有效的开始时间。对于边缘时间,该同步时间是开始边缘的有效开始时间或者结束边缘的有效结束时间。

在每次调用 ProcessEvent 方法后,将轮询 IsEmptyNextCti 属性。

在运算符覆盖 NextCti 时,该引擎将确保运算符处理的下一个事件或者是其同步时间小于 NextCti 的值的插入事件,或者是 NextCti 的值作为其开始时间的 CTI。如果运算符返回小于或等于上次处理的事件的同步时间的 NextCti 值,则该值将被忽略。NextCti 属性允许运算符将输入流中时间的进度“转换为”它自己的节奏(以这些内部 CTI 的形式),然后相应对此进度作出响应。

运算符将被激活,以便仅响应插入事件。CTI 不触发激活。在它从 IsEmpty 返回 true 时,运算符将被停用。

在任何点,引擎都可以选择序列化运算符并且释放它对运算符的引用。在以后反序列化该运算符时,则应该从其停止处开始。

用户定义的流运算符的示例

指数平滑

这一用户定义的流运算符将点事件流视作一个值的序列并且应用指数平滑。请注意,需要对 System.Runtime.Serialization 的引用。

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Implements exponential smoothing.
/// </summary>
[DataContract]
public sealed class SmoothingOperator : CepPointStreamOperator<double, double>
{
    [DataMember]
    readonly double _smoothingFactor;

    [DataMember]
    double? _previousValue;

    public SmoothingOperator(double smoothingFactor)
    {
        _smoothingFactor = smoothingFactor;
    }

    public override IEnumerable<double> ProcessEvent(PointEvent<double> inputEvent)
    {
        // The result is a function of the previous result and the current input value.
        _previousValue = _previousValue.HasValue
            ? (1.0 - _smoothingFactor) * _previousValue.Value + _smoothingFactor * inputEvent.Payload
            : inputEvent.Payload;

        yield return _previousValue.Value;
    }

    public override bool IsEmpty
    {
        get { return false; }
    }

模式匹配

这个简单的模式匹配示例说明如何交替使用 IsEmptyNextCti。在这个例子中,运算符将查找值为 1.0 的事件并且该事件在 30 秒内不后随值为 2.0 的事件。(提供这个示例是为了说明用户定义的流运算符中的有用概念。在实际应用中,此模式十分简单,足以通过使用 StreamInsight 中的内置运算符实现。)

以前的示例使用了 NextCti 来控制运算符的生存期。此示例还为此目的使用了 NextCti,但此外,它还使用 NextCti 来生成输出,以便对时间的推移作出响应。

namespace UdsoExamples
{
    using System;
    using System.Collections.Generic;
    using System.Runtime.Serialization;
    using Microsoft.ComplexEventProcessing;
    using Microsoft.ComplexEventProcessing.Extensibility;
 
/// <summary>
/// Indicates when an event with value '1' is followed by an event with value '2'
/// within thirty seconds.
/// </summary>
[DataContract]
public sealed class SimplePatternMatcher : CepPointStreamOperator<int, DateTime>
{
    [DataMember]
    DateTimeOffset? _nextCti;

    [DataMember]
    // Tracks timestamps for all events with value '1'.
    readonly Queue<DateTimeOffset> _active = new Queue<DateTimeOffset>();

    public override bool IsEmpty
    {
        // The operator is empty when we are not tracking any events with value '1'.
        get { return _active.Count == 0; }
    }

    public override DateTimeOffset? NextCti
    {
        get { return _nextCti; }
    }

    public override IEnumerable<DateTime> ProcessEvent(PointEvent<int> inputEvent)
    {
        // Produce output in response to the passage of time. Any active '1' event
        // not matched by a '2' event within thirty seconds matches the pattern.
        while (_active.Count > 0 && _active.Peek().AddSeconds(30) <= inputEvent.StartTime)
        {
            yield return _active.Dequeue().UtcDateTime;
        }

        // Update operator state based on new input event.
        if (inputEvent.EventKind == EventKind.Insert)
        {
            if (inputEvent.Payload == 1)
                _active.Enqueue(inputEvent.StartTime);
            else if (inputEvent.Payload == 2)
                _active.Clear();

        }

        // Schedule wake-up after thirty seconds so that we can produce output
        // if needed.
        if (_active.Count > 0)
        {
            _nextCti = _active.Peek().AddSeconds(30);
        }
    }
}
}

定义帮助器方法以便简化使用

您可能想要简化在查询中对运算符的使用。例如,查询作者撰写 input.Smooth(0.5) 可能比撰写 input.Scan(new SmoothingOperator(0.5)) 更方便。

您可以通过创建如下自定义扩展方法,实现这个简化的模式:

        static CepStream<EventType1> Smooth(this CepStream<EventType1> source, double smoothingFactor)
        {
            if (null == smoothingFactor)
            {
                throw new ArgumentNullException("source");
            }

            return source.Scan(new SmoothingOperator(smoothingFactor));
        }