用户定义聚合和运算符

本主题中的示例演示如何使用用户定义聚合 (UDA) 和用户定义运算符 (UDO) 来扩展 StreamInsight LINQ 运算符中的基于窗口的运算集。这些扩展插件是对事件窗口定义的,并且返回零个或多个结果事件。用户定义聚合或用户定义运算符必须编译到某一程序集中,而 StreamInsight 服务器必须可通过在运行时提供和使用适配器的相同方式来访问该程序集。

StreamInsight 还将用户定义流运算符作为其他扩展性机制提供。用户定义流运算符是直接对事件流定义的,而非对事件窗口定义的。

用户定义聚合

用户定义聚合基于窗口规范使用,可对该窗口中的事件进行聚合并生成单个结果值。UDA 将其输入作为包含一组 CEP 事件的 CEP 窗口(这是跳跃窗口、快照窗口或者基于计数的窗口运算符的结果),将输出作为单个返回值(映射到 StreamInsight Primitive 类型之一的 CLR 类型)。有关窗口的详细信息,请参阅使用事件窗口

与 StreamInsight 提供的类似 count、sum 和 average 的更简单聚合相比,您可以实现在功能上更复杂的 UDA。在后面的部分中将论述一个此类示例,即计算时间加权平均值。

用户定义运算符

用户定义运算符基于窗口规范使用,可对该窗口中的事件进行处理并生成一个或多个得到的事件。UDO 将其输入作为包含一组 CEP 事件的 CEP 窗口(这是跳跃窗口、快照窗口或计数窗口运算符的结果),将其输出作为一组 CEP 事件或一组 CEP 负载。

当需要为每个窗口生成或影响整个事件(包括其时间戳)的计算时,可以使用 UDO。例如,除了计算聚合之外,还设置事件的状态字段,其中状态取决于聚合结果和另一个参数。例如,UDO 可能为包含一个负载字段(包含聚合结果)和一个状态字段(指示聚合结果是否违反了某个约束)的每个窗口生成单个事件。

UDA 和 UDO 中的时间敏感度

您可以基于为实现这些运算符而选择的基类,将 UDA 和 UDO 定义为对时间不敏感或对时间敏感。

不应向对时间不敏感的 UDA 和 UDO 传递整个事件(包括其时间戳)。它们而是只考虑来自定义的窗口的一个或多个负载字段。而且,不会向它们传递当前窗口开始和结束时间。

对于每个窗口,将向时间敏感的 UDA 和 UDO 传递一组事件,包括其时间戳以及窗口开始时间和结束时间。UDA 或 UDO 是否对时间敏感取决于 UDA 或 UDO 作者从中派生实现的各自基类。

实现用户定义的聚合

UDA 作者的责任包括:

  • 提供 UDA 的实际实现。

  • 提供 LINQ 的扩展方法以允许查询作者使用 UDA。

若要实现 UDA,用户需要从适当的基类来派生:CepAggregate(对于对时间不敏感的 UDA)或 CepTimeSensitiveAggregate(对于对时间敏感的 UDA)。

类派生需要输入和输出类型参数的实例化。输入类型表示整个负载(如果 UDA 需要能够在其计算过程中查看整个负载字段集)或映射到 StreamInsight 类型系统中相应的 Primitive 类型的 CLR 类型(在单独字段是对 UDA 的输入的情况下)。这两种情况下的输出类型必须是映射到相应的 Primitive 类型的 CLR 类型。

除了事件数据之外,查询开始时的可选配置结构也可以传递到 UDA 类的构造函数(如果 UDA 作者预期这样)。如果 UDA 作者提供了此类构造函数,引擎将会使用 LINQ 中 UDA 的调用方提供的配置在运行时相应调用它。

对时间不敏感和对时间敏感的 UDA 都会将负载作为未经排序的集接收。对于对时间敏感的 UDA,事件的时间戳还另外与各负载相关联。此外,定义窗口开始和结束时间的窗口描述符将传递到 UDA。

用户定义聚合示例

下面的示例实现对时间不敏感的 UDA。它应该有一组整数事件字段。不为实现此示例指定可选的配置结构,因此,该类不需要特定的构造函数。

public class Median : CepAggregate<int, int>
{
    public override int GenerateOutput(IEnumerable<int> eventData)
    {
        var sortedData = eventData.OrderBy(e => e.Payload);
        int medianIndex = (int)sortedData.Count() / 2;
        return sortedData.Count() % 2 == 0 ?
            (sortedData.ElementAt(medianIndex).Payload + sortedData.ElementAt(medianIndex - 1).Payload) / 2 :
            sortedData.ElementAt(medianIndex).Payload;
    }
}

除了实现 UDA,您还必须提供 LINQ 的扩展方法才能允许查询编写者使用 UDA。扩展方法是一个签名,它使查询编写者能够使用聚合并编译查询。通过某个属性,StreamInsight LINQ 提供程序可以引用包含 UDA 实现的实际类,如下面的示例所示。

public static class MyUDAExtensionMethods
{
    [CepUserDefinedAggregate(typeof(Median))]
    public static int Med<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, int>> map)
    {
           throw CepUtility.DoNotCall();
    }
}

在这里,必须通过 Median 类实现 UDA,该类实现对 int 类型的单个字段进行操作的 UDA 并且返回 int 类型的值。函数签名中的表达式表示从输入流的事件类型到单个整数值的映射。请注意,扩展方法从不会执行,因此,CepUtility.DoNotCall() 包含在自身中。基于此规范,UDA 可用在 LINQ 中,如下面的示例所示。

from w in s. TumblingWindow(TimeSpan.FromMinutes(10))
select new { f1 = w.Med(e => e.val) }

lambda 表达式参数将事件负载映射到将是对 UDA 的输入的整数值。在这个例子中,将为每个窗口计算事件字段 val 的值的中值。

接下来,将以具有配置信息的“对时间不敏感”的 UDA 为例。它需要整个 Trade 类型的负载作为输入并且返回 double 类型的值。该示例还包含相应的扩展方法:

    public class Trade
    {
        public double Volume { get; set; }
        public double Price { get; set; }
    }

    public class Vwap : CepAggregate<Trade, double>
    {
        double weight;

        /// <summary>
        /// Constructor for parameterized UDA
        /// </summary>
        public Vwap(double w)
        {
            weight = w;
        }

        public override double GenerateOutput(IEnumerable<Trade> events)
        {
            double vwap = events.Sum(e => e.Price * e.Volume) / events.Sum(e => e.Volume);

            return vwap * weight;
        }
    }

    static public partial class UDAExtensionMethods
    {
        [CepUserDefinedAggregate(typeof(Vwap))]
        public static double vwap(this CepWindow<Trade> window, double w)
        {
            throw CepUtility.DoNotCall();
        }
    }

因为整个负载将是输入,所以,扩展方法将不指定任何 lambda 表达式。UDA 的唯一参数是用于配置的值(在此处为 double 类型):

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
             select new { f1 = w.vwap(2.5) }

接下来,将以具有配置信息的“对时间敏感”的 UDA 为例。该 UDA 是具有解释为阶跃函数的间隔事件的时间加权平均值(也就是说,每个间隔都在下一个之前有效)。与前面的例子相似,它不需要整个负载作为输入,而只需要 double 类型的值。

请注意,即使事件负载限制为 double 值,输入集仍定义为一组间隔事件,而不是像对时间不敏感的 UDA 的例子那样定义为一组负载。这是包括时间戳所需的,因为 UDA 指定为对时间敏感。此外,窗口本身以具有开始时间和结束时间属性的 WindowDescription 对象的形式提供。这些时间戳按 UTC 时间指定。还要注意,UdaConfig 是必须可通过 DataContractSerializer 序列化的类或结构。

public class TimeWeightedAverage : CepTimeSensitiveAggregate<double, double>
{
    UdaConfig _udaConfig;
    public TimeWeightedAverage(UdaConfig udaConfig)
    {
        _udaConfig = udaConfig;
    }

    public override Output GenerateOutput(IEnumerable<IntervalEvent<double>> events,
                                          WindowDescriptor windowDescriptor)
    {
        double avg = 0;
        foreach (IntervalEvent<double> intervalEvent in events)
        {
            avg += intervalEvent.Payload * (intervalEvent.EndTime - 
                                            intervalEvent.StartTime).Ticks;
        }
        avg = avg / (windowDescriptor.EndTime - 
                     windowDescriptor.StartTime).Ticks;
        return avg * udaConfig.Multiplier;
    }
}

其中,UDAConfig 是

public class UDAConfig
{
    public double Multiplier { get; set; }
}

该扩展方法现在还包括以下配置结构:

[CepUserDefinedAggregate(typeof(TimeWeightedAverage))]
public static double twa<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, double>> map, UdaConfig config)
{
    throw CepUtility.DoNotCall();
}

该配置将成为扩展方法中的另一个参数:

var result = from w in s.TumblingWindow(TimeSpan.FromMinutes(10))
         select new w.TimeWeightedAverage (e => e.dval,
                            new UdaConfig(){ Multiplier = 5 }); 

上述示例都考虑的是事件被类型化的情况。也就是说,在实现 UDA 时负载类型已知。下面的示例实现一个 UDA,它具有输入类型仅在运行时传递到该 UDA 中的泛型输入类型。

public class GenericInputUda<TInput> : CepAggregate<TInput, bool>
{
    public GenericInputUda(SampleUdaConfig config)
    {
        // ...
    }

    public override bool GenerateOutput(IEnumerable<TInput> payloads)
    {
        // ...
    }
}

实现用户定义运算符

UDO 作者的责任包括:

  • 提供 UDO 的实际实现。

  • 提供 LINQ 的扩展方法以允许查询编写者使用 UDO。

要实现 UDO,用户需要从适当的基类来派生:CepOperator(对于对时间不敏感的 UDO)或 CepTimeSensitiveOperator。类派生需要输入和输出类型参数的实例化。输入类型始终表示整个负载。输出类型可以是一组负载或一组事件,具体取决于所选的基类。

除了事件数据之外,您还可以将查询开始时的可选配置结构传递到 UDO 类的构造函数(如果 UDO 作者想要这样做)。如果 UDO 作者提供了某一构造函数,引擎将会使用 LINQ 中 UDO 的调用方提供的配置在运行时相应调用该构造函数。

对时间不敏感和对时间敏感的 UDO 都会将负载作为未经排序的集接收。对于对时间敏感的 UDO,事件的时间戳还另外与各负载相关联。此外,定义窗口开始和结束时间的窗口描述符将传递到 UDO。

用户定义运算符中的 CTI 行为

UDO 将通过以下方式更改当前时间增量 (CTI):当某个窗口仍“打开”(即在窗口结束时间后已收到带时间戳的 CTI)时,位于该窗口中的所有 CTI 都将更改为窗口开始时间。这样可确保只要窗口仍处于打开状态,UDO 的输出(可能潜在包含用户定义的时间戳)就会发生更改。

示例用户定义的运算符实现

下面的示例实现不具有配置信息的对时间不敏感的 UDO。

public class SampleUDO : CepOperator<Input, Output>
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        Output output = new Output();
        output.total = 0;
        output.status = "good";

        foreach (Input payload in payloads)
        {
            output.total += payload.Value;
            if (payload.Flag == 4)
            {
                output.status = "bad";
                break;
            }
        }
        List<Output> outputCollection = new List<Output>();
        outputCollection.Add(output);
        return outputCollection;
    }
}

下面的示例显示如何对接受配置信息的对时间敏感的 UDO 更改签名。

public class GenericOutputUdo: CepTimeSensitiveOperator<InputEventType, TOutput>
{
    public GenericOutputUdo(SampleUdoConfig config)
    {
        ...
    }

    public override IEnumerable<IntervalEvent<TOutput>> GenerateOutput(
                             IEnumerable<IntervalEvent<InputEventType>> payloads,
                             WindowDescriptor windowDescriptor)
    {
        ...
    }
}

用户定义运算符的扩展方法的示例

除了实现 UDO,UDO 作者还必须提供 LINQ 的扩展方法才能允许查询编写者使用 UDO。扩展方法是一个签名,它使查询编写者能够使用运算符并编译查询。通过某个属性,LINQ 提供程序可以引用包含 UDO 实现的实际类,如下面的示例所示。

[CepUserDefinedOperator(typeof(SampleUDO))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window)
{
    throw CepUtility.DoNotCall();
}

现在,可以按照以下方式使用 UDO。

var newstream = from w in inputStream.Snapshot()
                select w.MyUDO();

下面的示例演示扩展方法以及具有配置结构的 UDO 的使用,它引用在名为 SampleUDOwithConfig 的类中包含的实现。

[CepUserDefinedOperator(typeof(SampleUDOwithConfig))]
public static OutputEventType MyUDO(this CepWindow<InputEventType> window, UDOConfig config)
{
    throw CepUtility.DoNotCall();
}

var newstream = from w in inputStream.SnapshotWindow()
                select w.MyUDO(new UDOConfig());

特定于区域性的事件字段属性

UDO、UDA 和 UDF 之类的扩展插件可被视作具有其类型系统的 CEP 域和 .Net CLR 之间的接口。对于某些应用程序,最好能够通过此接口传递区域性信息。对于 UDA 和 UDO,扩展插件作者可以实现附加的接口 IDeclareEventProperties,它允许检查或设置事件字段的区域性属性。为了实现此接口,您必须提供函数 DeclareEventProperties,该函数返回一个 CepEventType 对象,该对象可为其字段承载区域性信息,如下例中所示:

public class SampleUDO : CepOperator<Input, Output>, IDeclareEventProperties
{
    public override IEnumerable<Output> GenerateOutput(IEnumerable<Input> payloads)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        // assuming string field 'loc' in type Input
        // assuming string fields 'firstName' and 'location' in type Output
        outputEventType.Fields["firstName"].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        outputEventType.Fields["location"].CultureInfo = base.InputEventType.Fields["loc"].CultureInfo;
        return outputEventType;
    }
}

此 UDO 示例采用 Input 类型的输入类型并且生成 Output 类型的事件。该 Output 类型具有 UDO 作者想要显式用某些区域性信息加批注的字符串字段。名为 zh-CN 的区域性将应用于输出字段 firstName,而输出字段 location 将用与 UDO 的输入事件类型中的字段 loc 相关联的相同区域性加批注。对于该 UDO 在运行时生成的每个事件,这些区域性都在该事件插入到 UDO 的输出流之前应用于其字段。

对于用户定义聚合也存在相同的接口。因为聚合仅具有单个返回值,所以,为了将特定于区域性的信息应用于此类字段,IDeclareEventProperties 接口将返回值包装到具有单个字段的 CepEventType 中,以便提供使用特定于 CEP 的事件属性给该字段加批注的方法。

public class MyUDA : CepAggregate<Input, string>, IDeclareEventProperties
{
    public override string GenerateOutput(IEnumerable<Input> events)
    {
        ...
    }

    public CepEventType DeclareEventProperties(CepEventType outputEventType)
    {
        outputEventType.FieldsByOrdinal[0].CultureInfo = new System.Globalization.CultureInfo("zh-CN");
        return outputEventType;
    }
}

在这里,表示聚合结果的字符串将包装到 CepEventType 中,因此,UDA 作者可对该字段设置 CultureInfo 属性。此区域性信息将传播到在 LINQ 查询中接收聚合结果的实际事件字段(在其中将使用 UDA)。

请参阅

概念

使用事件窗口

其他资源

在 LINQ 中编写查询模板