Поделиться через


Запрос наблюдаемых последовательностей с помощью операторов LINQ

В разделе Объединение с существующими событиями .NET мы преобразовали существующие события .NET в отслеживаемые последовательности, чтобы подписаться на них. В этом разделе мы рассмотрим первоклассную природу наблюдаемых последовательностей как объектов IObservable<T> , в которых универсальные операторы LINQ предоставляются сборками Rx для управления этими объектами. Большинство операторов принимают наблюдаемую последовательность, выполняют в ней некоторую логику и выводить другую наблюдаемую последовательность. Кроме того, как видно из наших примеров кода, можно даже связать несколько операторов в исходной последовательности, чтобы настроить результирующую последовательность в зависимости от ваших требований.

Использование различных операторов

Мы уже использовали операторы Create и Generate в предыдущих разделах для создания и возврата простых последовательностей. Мы также использовали оператор FromEventPattern для преобразования существующих событий .NET в наблюдаемые последовательности. В этом разделе мы будем использовать другие статические операторы LINQ наблюдаемого типа, чтобы можно было фильтровать, группировать и преобразовывать данные. Такие операторы принимают наблюдаемые последовательности в качестве входных данных и создают наблюдаемые последовательности в качестве выходных данных.

Объединение различных последовательностей

В этом разделе мы рассмотрим некоторые операторы, которые объединяют различные наблюдаемые последовательности в одну наблюдаемую последовательность. Обратите внимание, что данные не преобразуются при объединении последовательностей.

В следующем примере мы используем оператор Concat, чтобы объединить две последовательности в одну последовательность и подписаться на нее. В целях иллюстрации мы будем использовать очень простой оператор Range(x, y) для создания последовательности целых чисел, которая начинается с x и затем создает последовательные числа y.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Обратите внимание, что результирующим порядком является 1,2,3,1,2,3. Это связано с тем, что при использовании оператора Concat второе последовательность (source2) не будет активной до тех пор, пока 1-я последовательность (source1) не завершит отправку всех своих значений. Только после source1 завершения source2 начинается отправка значений в результирующий последовательности. Затем подписчик получит все значения из результирующих последовательностей.

Сравните это с оператором Merge. При выполнении следующего примера кода вы получите .1,1,2,2,3,3 Это связано с тем, что две последовательности активны одновременно, а значения выталкиваются по мере их появления в источниках. Результирующий последовательность завершается только после того, как последняя исходная последовательность завершила отправку значений.

Обратите внимание, что для работы merge все исходные наблюдаемые последовательности должны иметь один и тот же тип IObservable<T>. Результируемая последовательность будет иметь тип IObservable<T>. Если source1 создает OnError в середине последовательности, результирующий последовательность завершится немедленно.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Другое сравнение можно выполнить с оператором Catch. В этом случае, если source1 завершается без ошибок, то source2 не запускается. Таким образом, при выполнении следующего примера кода вы получите только 1,2,3 так, как source2 (который создает 4,5,6) игнорируется.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Наконец, рассмотрим OnErrorResumeNext. Этот оператор будет переходить к , source2 даже если source1 не удается завершить из-за ошибки. В следующем примере, хотя source1 представляет последовательность, которая завершается с исключением (с помощью оператора Throw), подписчик будет получать значения (1,2,3), опубликованные .source2 Таким образом, если предполагается, что любая исходная последовательность создаст ошибку, безопаснее использовать OnErrorResumeNext, чтобы гарантировать, что подписчик по-прежнему будет получать некоторые значения.

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Обратите внимание, что для работы всех этих операторов сочетания все наблюдаемые последовательности должны иметь один и тот же тип T.

Прогнозирование

Оператор Select может преобразовать каждый элемент наблюдаемой последовательности в другую форму.

В следующем примере мы проецируем последовательность целых чисел в строки длиной n соответственно.

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

В следующем примере, который является расширением примера преобразования событий .NET, который мы видели в разделе Мост с существующими событиями .NET , мы используем оператор Select для проецирования типа данных IEventPattern<MouseEventArgs> в тип Point . Таким образом, мы преобразуем последовательность событий перемещения мыши в тип данных, который можно анализировать и обрабатывать дальше, как показано в следующем разделе "Фильтрация".

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Наконец, рассмотрим оператор SelectMany. Оператор SelectMany имеет много перегрузок, одна из которых принимает аргумент функции селектора. Эта функция селектора вызывается для каждого значения, вытесненного наблюдаемым источником. Для каждого из этих значений селектор проецит его в мини-наблюдаемую последовательность. В конце оператор SelectMany преобразует все эти мини-последовательности в одну результирующую последовательность, которая затем передается подписчику.

Наблюдаемый объект, возвращаемый из SelectMany, публикует OnCompleted после завершения исходной последовательности и всех мини-наблюдаемых последовательностей, созданных селектором. Он срабатывает При возникновении ошибки в исходном потоке, при возникновении исключения функцией селектора или при возникновении ошибки в любой из мини-наблюдаемых последовательностей.

В следующем примере мы сначала создадим исходную последовательность, которая каждые 5 секунд создает целое число, и решаем просто взять первые 2 значения ( с помощью оператора Take). Затем мы используем для SelectMany проецировать каждое из этих целых чисел с помощью другой последовательности {100, 101, 102}. Таким образом, создаются две мини-наблюдаемые последовательности: {100, 101, 102} и {100, 101, 102}. Наконец, они объединяются в единый поток целых {100, 101, 102, 100, 101, 102} чисел и передаются наблюдателю.

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

Фильтрация

В следующем примере мы используем оператор Generate для создания простой наблюдаемой последовательности чисел. Оператор Generate имеет несколько перегрузок. В нашем примере он принимает начальное состояние (0 в нашем примере), условную функцию для завершения (менее 10 раз), итератор (+1), селектор результатов (квадратную функцию текущего значения). и выведите только те, которые меньше 15, используя операторы Where и Select.

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

Следующий пример является расширением примера проекции, который вы видели ранее в этом разделе. В этом примере мы использовали оператор Select для проецирования типа данных IEventPattern<MouseEventArgs> в тип Point . В следующем примере мы используем операторы Where и Select, чтобы выбрать только те движения мыши, которые нас интересуют. В этом случае мы отфильтруем перемещение мыши по первому бисектору (где координаты x и y равны).

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Операция на основе времени

Операторы Buffer можно использовать для выполнения операций на основе времени.

Буферизация наблюдаемой последовательности означает, что значения наблюдаемой последовательности помещаются в буфер на основе заданного интервала времени или порога счетчика. Это особенно полезно в ситуациях, когда вы ожидаете, что последовательность вытесняет огромный объем данных, а у подписчика нет ресурса для обработки этих значений. Буферизируя результаты на основе времени или счетчика и возвращая последовательность значений только при превышении условий (или при завершении исходной последовательности), подписчик может обрабатывать вызовы OnNext в собственном темпе. 

В следующем примере мы сначала создадим простую последовательность целых чисел для каждой секунды. Затем мы используем оператор Buffer и указываем, что каждый буфер будет содержать 5 элементов из последовательности. OnNext вызывается, когда буфер заполнен. Затем мы подсведем сумму буфера с помощью оператора Sum. Буфер автоматически очищается, и начинается еще один цикл. Распечатка будет 10, 35, 60… иметь значение 10=0+1+2+3+4, 35=5+6+7+8+9 и т. д.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

Можно также создать буфер с указанным диапазоном времени. В следующем примере буфер будет содержать элементы, накопленные в течение 3 секунд. Распечатка будет 3, 12, 21... в котором 3=0+1+2, 12=3+4+5 и т. д.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

Обратите внимание, что при использовании буфера или окна необходимо убедиться, что последовательность не пуста перед фильтрацией по ней.

Операторы LINQ по категориям

В разделе Операторы LINQ по категориям перечислены все основные операторы LINQ, реализованные типом Наблюдаемые по категориям; в частности: создание, преобразование, объединение, функциональная, математическая, время, исключения, прочее, выбор и примитивы.

См. также:

Ссылка

Observable

Основные понятия

Операторы LINQ по категориям