Поделиться через


Соединения

Примеры в этом разделе иллюстрируют сопоставление значений из разных потоков с помощью операции соединения. В операции соединения каждое событие из одного входного потока сравнивается с каждым событием из одного или нескольких других входных потоков типа CepStream. Если допустимые интервалы времени событий перекрываются и выполняется условие соединения, то операция создает одно событие выхода.

Как обычно в LINQ условие соединения может быть задано как предикат эквивалентного соединения между двумя входными потоками с помощью предложения on … equals или предложения where, которые могут ссылаться на два или более входных потоков. Предикат эквивалентного соединения позволяет сравнивать такие отдельные поля, как where x.a equals y.a или составные ключи, например where {x.a, x.b} equals {y.a, y.b} или where x equals y.

Примеры

Внутреннее соединение

Операция внутреннего соединения возвращает все события из двух или более входных потоков, если вычисление предиката соединения между ними приводит к получению значения true. Предикат соединения — это выражение, в котором сравниваются поля полезных данных соединяемых потоков событий. Во внутреннем соединении устраняются все события, не имеющие согласующегося события в других указанных потоках событий.

Эквивалентное соединение

В следующем примере события в потоке stream1 сравниваются с событиями в потоке stream2. События в потоке, которые соответствуют критериям равенства, определенным в предложении on (в дополнение к требованию перекрытия интервалов времени для двух событий), соединяются и выводятся в новое событие, которое содержит поля полезных данных i и j из события e1 и поле j из события e2.

// Assuming the following input event type for both stream1 and stream2.
public class MyPayload
{
    public int i;
    public float j;
}

var equiJoin = from e1 in stream1
               join e2 in stream2
               on e1.i equals e2.i
               select new { e1.i, e1.j, e2.j };

Предикат равенства обеспечивает сравнение как типов-примитивов, так и составных типов. Например, предусмотрена возможность выполнить соединение on {e1i, e1j} equals {e2i, e2j}.

Перекрестное соединение

Операция перекрестного соединения (декартово произведение) возвращает поток событий, являющийся сочетанием каждого события из первого потока с каждым событием из второго потока. Продолжительностью служит пересечение интервалов событий из каждого потока. Критерий фильтра может использоваться в предложении where для наложения ограничений на события из каждого входного потока. Важно отметить, что для двух входных потоков перекрестное соединение с предложением where, которое выполняет проверку на наличие равенства, равносильно эквивалентному соединению с соответствующим предложением on … equals. Перекрестное соединение должно использоваться для предикатов неравенства или для более чем двух входных потоков.

В следующем примере события в потоке stream1, имеющие значение в поле полезных данных i, которое превышает 3, соединяются с событиями из stream2 со значением в поле полезных данных j, которое меньше 10.

var crossJoin = from e1 in stream1
                from e2 in stream2
                where e1.i > 3 && e2.j < 10
                select new { e1.i, e2.j };

Левое антиполусоединение

Левое антиполусоединение дает результат соединения для каждого события с левой стороны только в случае, если в каждый момент времени обычное соединение пусто. Эта операция удобна для обнаружения интервалов, содержащих нулевое число событий.

var leftAntiSemiJoin = from left in stream1 
                       where (from right in stream2 
                              where left.v == right.v
                              select right).IsEmpty()
                       select left;

На следующем рисунке показан результат приведенного выше соединения с двумя примерами входных потоков в предположении, что условие соединения имеет значение true. Также выводится промежуточный результат обычного соединения.

Пример антиполусоединения с двумя потоками

Соединение нескольких потоков

Предусмотрена возможность соединения нескольких потоков в одном запросе, как показано в следующем примере.

var slopetest = from f in fastSignal
                          from s in slowSignal
                          from r in refSignal
                          select new { alarm = f.avg / s.avg < r.Threshold };