联接

本主题中的示例演示如何通过使用联接操作关联不同流中的值。联接操作将一个输入流中的每个事件与 CepStream 类型的一个或多个其他输入流中的每个事件进行比较。如果其有效时间间隔重叠并且联接条件有效,则该操作生成一个输出事件。

像在 LINQ 中一样,可通过使用 on … equals 子句或通过可引用两个或更多输入流的 where 子句,将联接条件指定为两个输入流之间的同等联接谓词。同等联接谓词可以比较单个字段(例如 where x.a equals y.a)或组合键(例如 where {x.a, x.b} equals {y.a, y.b} 或 where x equals y)。

示例

内部联接

在两个或更多输入流之间的联接谓词的计算结果为 true 时,内部联接操作返回这两个输入流中的所有事件。联接谓词是一个表达式,该表达式比较要联接的事件流的负载字段。内部联接可删除在其他指定事件流中没有匹配的事件的所有事件。

同等联接

下面的示例将流 stream1 中的事件与流 stream2 中的事件进行了比较。流中满足 on 子句中定义的相等条件(此外,还在两个事件的时间间隔中重叠)的事件将进行联接并输出到包含以下字段的新事件中:事件 e1 中的 i 和 j 负载字段以及事件 e2 中的 j 字段。

// Assuming the following input event type for both stream1 and stream2.
public class MyPayload
{
    public int i;
    public float j;
}

var equiJoin = from e1 in stream1
               join e2 in stream2
               on e1.i equals e2.i
               select new { e1.i, e1.j, e2.j };

相等谓词允许基元类型和复合类型的比较。例如,可以联接 on {e1i, e1j} equals {e2i, e2j}。

交叉联接

交叉联接(Cartesian 产品)操作返回一个事件流,该事件流结合了第一个流中的每个事件和第二个流中的每个事件(在各事件流的事件的时间间隔的重叠期间中)。筛选表达式可在 where 子句中用来限制每个输入流中的事件。特别要注意的是,对于两个输入流,使用检查是否相等的 where 子句的交叉联接等效于具有相应的 on … equals 子句的同等联接。对于非相等谓词或者超过两个输入流,必须使用交叉联接。

在下面的示例中,stream1 中负载字段 i 中的值大于 3 的事件与 stream2 中负载字段 j 中的值小于 10 的事件进行联接。

var crossJoin = from e1 in stream1
                from e2 in stream2
                where e1.i > 3 && e2.j < 10
                select new { e1.i, e2.j };

左反半部联接

左反半部联接仅在普通联接的结果为空时(对于每个时间点)为左侧上的每个事件生成联接结果。此操作用于检测到具有零个事件的间隔。

var leftAntiSemiJoin = from left in stream1 
                       where (from right in stream2 
                              where left.v == right.v
                              select right).IsEmpty()
                       select left;

下图说明具有两个示例输入流的以上联接结果,并且假定联接条件的计算结果为 true。此外还显示了中间的普通联接结果。

有两个流的反半部联接示例

联接多个流

您可以在单个查询中联接多个流,如以下示例中所示。

var slopetest = from f in fastSignal
                          from s in slowSignal
                          from r in refSignal
                          select new { alarm = f.avg / s.avg < r.Threshold };