Поделиться через


Создание простых наблюдаемых последовательностей и подписка на нее

Для создания наблюдаемых последовательностей не требуется вручную реализовывать интерфейс IObservable<T> . Аналогичным образом, для подписки на последовательность не требуется реализовать IObserver<T> . Установив сборки реактивного расширения, вы можете воспользоваться преимуществами наблюдаемого типа, который предоставляет множество статических операторов LINQ для создания простой последовательности с нулевым, одним или несколькими элементами. Кроме того, Rx предоставляет методы расширения Subscribe, которые принимают различные сочетания обработчиков OnNext, OnError и OnCompleted с точки зрения делегатов.

Создание простой последовательности и подписка на нее

В следующем примере используется оператор Range типа Observable для создания простой наблюдаемой коллекции чисел. Наблюдатель подписывается на эту коллекцию с помощью метода Subscribe класса Observable и предоставляет действия, являющиеся делегатами, которые обрабатывают OnNext, OnError и OnCompleted.

Оператор Range имеет несколько перегрузок. В нашем примере он создает последовательность целых чисел, которая начинается с x и затем создает последовательные числа y. 

Как только подписка происходит, значения отправляются наблюдателю. Затем делегат OnNext выводит значения.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}

Когда наблюдатель подписывается на наблюдаемую последовательность, поток, вызывающий метод Subscribe, может отличаться от потока, в котором последовательность выполняется до завершения. Таким образом, вызов Subscribe является асинхронным, так как вызывающий объект не блокируется до завершения наблюдения за последовательностью. Дополнительные сведения см. в разделе Использование планировщиков .

Обратите внимание, что метод Subscribe возвращает IDisposable, чтобы можно было отменить подписку на последовательность и легко удалить ее. При вызове метода Dispose в наблюдаемой последовательности наблюдатель перестанет прослушивать наблюдаемый объект для данных.  Как правило, не требуется явным образом вызывать Dispose, если не требуется отменить подписку рано или если исходная наблюдаемая последовательность имеет более длительный срок жизни, чем у наблюдателя. Подписки в Rx предназначены для сценариев запуска и забывания без использования средства завершения. Когда сборщик мусора собирает экземпляр IDisposable, Rx не удаляет подписку автоматически. Однако обратите внимание, что по умолчанию операторы Observable по умолчанию удаляют подписку как можно скорее (т. е. при публикации сообщений OnCompleted или OnError). Например, код var x = Observable.Zip(a,b).Subscribe(); будет подписываться x на обе последовательности a и b. Если вызывает ошибку, x будет немедленно отменена подписка на b.

Вы также можете настроить пример кода, чтобы использовать оператор Create типа Observable , который создает и возвращает наблюдатель из указанных делегатов действий OnNext, OnError и OnCompleted. Затем можно передать этот наблюдатель в метод Subscribe типа Observable . В следующем примере показано, как это сделать.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}

Помимо создания наблюдаемой последовательности с нуля, можно преобразовать существующие перечислители, события .NET и асинхронные шаблоны в наблюдаемые последовательности. В других разделах этого раздела показано, как это сделать.

Обратите внимание, что в этом разделе показано только несколько операторов, которые могут создавать наблюдаемую последовательность с нуля. Дополнительные сведения о других операторах LINQ см. в статье Запрос наблюдаемых последовательностей с помощью операторов LINQ.

Использование таймера

В следующем примере для создания последовательности используется оператор Timer. Последовательность вытесняет первое значение после истечения 5 секунд, а последующие значения будут вытеснены каждые 1 секунду. Для иллюстрации мы связаем оператор Timestamp с запросом, чтобы каждое вытесненное значение добавлялось к моменту публикации. Таким образом, когда мы подписываемся на эту исходную последовательность, мы можем получить как ее значение, так и метку времени.

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

Результат должен быть аналогичен следующему:

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

С помощью оператора Timestamp мы убедились, что первый элемент действительно вытеснен через 5 секунд после запуска последовательности, и каждый элемент публикуется через 1 секунду.

Преобразование перечисляемой коллекции в наблюдаемую последовательность

С помощью оператора ToObservable можно преобразовать универсальную перечисляемую коллекцию в наблюдаемую последовательность и подписаться на нее.

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();

Холодные и горячие наблюдаемые компоненты

Холодные наблюдаемые объекты начинают выполняться после подписки, т. е. наблюдаемая последовательность начинает отправлять значения наблюдателям только при вызове Подписки. Значения также не передаются подписчикам. Это отличается от горячих наблюдаемых элементов, таких как события перемещения мыши или биржевые тикеры, которые уже создают значения еще до того, как подписка активна. Когда наблюдатель подписывается на горячую наблюдаемую последовательность, он получает текущее значение в потоке. Горячая наблюдаемая последовательность совместно используется для всех подписчиков, и каждому подписчику передается следующее значение последовательности. Например, даже если никто не подписался на определенный тиккер акций, тикер будет продолжать обновлять свою стоимость на основе движения рынка. Когда подписчик регистрирует интерес к этому тикеру, он автоматически получает последний тик.

В следующем примере демонстрируется холодная наблюдаемая последовательность. В этом примере мы используем оператор Interval для создания простой наблюдаемой последовательности чисел, откачиваемых через определенные интервалы, в данном случае каждые 1 секунду.

Затем два наблюдателя подписываются на эту последовательность и выводить ее значения. Вы заметите, что последовательность сбрасывается для каждого подписчика, в которой вторая подписка перезапустит последовательность из первого значения.

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

В следующем примере мы преобразуем предыдущую холодную наблюдаемую последовательность source в горячую с помощью оператора Publish, который возвращает экземпляр IConnectableObservable, который называется hot. Оператор Publish предоставляет механизм совместного использования подписок путем трансляции одной подписки нескольким подписчикам. hot выступает в качестве прокси-сервера и подписывается sourceна , а затем, получая значения от source, отправляет их своим подписчикам. Чтобы создать подписку на резервную копию source и начать получать значения, мы используем метод IConnectableObservable.Connect(). Так как IConnectableObservable наследует IObservable, мы можем использовать подписку на эту горячую последовательность еще до ее запуска. Обратите внимание, что в примере горячая последовательность не запущена при subscription1 подписке на нее. Таким образом, никакие значения не отправляются подписчику. После вызова Connect значения отправляются в subscription1. После задержки в 3 секунды subscription2 подписывается hot на и начинает получать значения сразу из текущей позиции (в данном случае 3) до конца. Выходные данные выглядят следующим образом.

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();