다음을 통해 공유


단순 관찰 가능한 시퀀스 만들기 및 구독

관찰 가능한 시퀀스를 만들기 위해 IObservable<T> 인터페이스를 수동으로 구현할 필요가 없습니다. 마찬가지로, 시퀀스를 구독하기 위해 IObserver<T> 를 구현할 필요가 없습니다. 반응형 확장 어셈블리를 설치하면 많은 정적 LINQ 연산자를 제공하는 Observable 형식을 활용하여 0개 이상의 요소로 간단한 시퀀스를 만들 수 있습니다. 또한 Rx는 대리자 측면에서 OnNext, OnError 및 OnCompleted 처리기의 다양한 조합을 사용하는 구독 확장 메서드를 제공합니다.

간단한 시퀀스 만들기 및 구독

다음 샘플에서는 Observable 형식의 Range 연산자를 사용하여 관찰 가능한 간단한 숫자 컬렉션을 만듭니다. 관찰자는 Observable 클래스의 Subscribe 메서드를 사용하여 이 컬렉션을 구독하고 OnNext, OnError 및 OnCompleted를 처리하는 대리자인 작업을 제공합니다.

Range 연산자에는 여러 오버로드가 있습니다. 이 예제에서는 x로 시작하고 나중에 y 순차적 숫자를 생성하는 정수 시퀀스를 만듭니다. 

구독이 발생하는 즉시 값이 관찰자에게 전송됩니다. 그런 다음 OnNext 대리자는 값을 출력합니다.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}

관찰자가 관찰 가능한 시퀀스를 구독하는 경우 Subscribe 메서드를 호출하는 스레드는 시퀀스가 완료될 때까지 실행되는 스레드와 다를 수 있습니다. 따라서 구독 호출은 시퀀스의 관찰이 완료될 때까지 호출자가 차단되지 않는다는 측면에서 비동기적입니다. 이 내용은 스케줄러 사용 항목에서 자세히 설명합니다.

Subscribe 메서드는 시퀀스를 구독 취소하고 쉽게 삭제할 수 있도록 IDisposable을 반환합니다. 관찰 가능한 시퀀스에서 Dispose 메서드를 호출하면 관찰자가 관찰 가능한 데이터 수신 대기를 중지합니다.  일반적으로 일찍 구독을 취소해야 하거나 원본 관찰 가능한 시퀀스의 수명이 관찰자보다 긴 경우 Dispose를 명시적으로 호출할 필요가 없습니다. Rx의 구독은 종료자를 사용하지 않고 화재 및 잊어버리기 시나리오를 위해 설계되었습니다. 가비지 수집기에서 IDisposable instance 수집하는 경우 Rx는 구독을 자동으로 삭제하지 않습니다. 그러나 Observable 연산자의 기본 동작은 가능한 한 빨리 구독을 삭제하는 것입니다(즉, OnCompleted 또는 OnError 메시지가 게시된 경우). 예를 들어 코드 var x = Observable.Zip(a,b).Subscribe(); 는 x를 a와 b 시퀀스 모두에 구독합니다. 오류가 발생하면 x는 b에서 즉시 구독 취소됩니다.

지정된 OnNext, OnError 및 OnCompleted 작업 대리자의 관찰자를 만들고 반환하는 Observable 형식의 Create 연산자를 사용하도록 코드 샘플을 조정할 수도 있습니다. 그런 다음 이 관찰자를 Observable 형식의 Subscribe 메서드에 전달할 수 있습니다 . 다음 샘플에서는 이 작업을 수행하는 방법을 보여줍니다.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}

관찰 가능한 시퀀스를 처음부터 만드는 것 외에도 기존 열거자, .NET 이벤트 및 비동기 패턴을 관찰 가능한 시퀀스로 변환할 수 있습니다. 이 섹션의 다른 topics 이 작업을 수행하는 방법을 보여줍니다.

이 항목에는 관찰 가능한 시퀀스를 처음부터 만들 수 있는 몇 가지 연산자만 표시됩니다. 다른 LINQ 연산자에 대한 자세한 내용은 LINQ 연산자를 사용하여 관찰 가능한 시퀀스 쿼리를 참조하세요.

타이머 사용

다음 샘플에서는 타이머 연산자를 사용하여 시퀀스를 만듭니다. 시퀀스는 5초가 경과한 후 첫 번째 값을 푸시한 다음 1초마다 후속 값을 푸시합니다. 설명 목적으로 푸시된 각 값이 게시된 시간에 추가되도록 Timestamp 연산자를 쿼리에 연결합니다. 이렇게 하면 이 원본 시퀀스를 구독할 때 해당 값과 타임스탬프를 모두 받을 수 있습니다.

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

다음과 유사하게 출력됩니다.

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

Timestamp 연산자를 사용하여 시퀀스가 시작된 후 5초 후에 첫 번째 항목이 실제로 푸시된 것을 확인했으며 각 항목은 1초 후에 게시됩니다.

열거 가능한 컬렉션을 관찰 가능한 시퀀스로 변환

ToObservable 연산자를 사용하여 제네릭 열거 가능한 컬렉션을 관찰 가능한 시퀀스로 변환하고 구독할 수 있습니다.

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();

콜드 및 핫 관찰 가능 개체

콜드 관찰 가능 개체는 구독 시 실행을 시작합니다. 즉, 관찰 가능한 시퀀스는 구독이 호출될 때만 관찰자에게 값을 푸시하기 시작합니다. 구독자 간에도 값이 공유되지 않습니다. 이는 구독이 활성화되기 전에 이미 값을 생성하고 있는 마우스 이동 이벤트 또는 주식 시세와 같은 핫 관찰 가능 항목과 다릅니다. 관찰자가 관찰 가능한 핫 시퀀스를 구독하면 스트림의 현재 값을 가져옵니다. 관찰 가능한 핫 시퀀스는 모든 구독자 간에 공유되며 각 구독자는 시퀀스의 다음 값을 푸시합니다. 예를 들어 특정 주식 시세를 구독한 사람이 없더라도 시세는 시장 움직임에 따라 계속 가치를 업데이트합니다. 구독자가 이 티커에 관심을 등록하면 자동으로 최신 틱이 표시됩니다.

다음 예제에서는 콜드 관찰 가능한 시퀀스를 보여 줍니다. 이 예제에서는 Interval 연산자를 사용하여 1초마다 특정 간격으로 펌핑되는 간단한 관찰 가능한 숫자 시퀀스를 만듭니다.

그런 다음 두 명의 관찰자가 이 시퀀스를 구독하고 해당 값을 출력합니다. 두 번째 구독이 첫 번째 값에서 시퀀스를 다시 시작하는 각 구독자에 대해 시퀀스가 다시 설정됩니다.

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

다음 예제에서는 이름을 로 지정할 instance IConnectableObservable을 반환하는 Publish 연산자를 사용하여 이전 콜드 관찰 가능한 hot시퀀스를 핫 시퀀스 source 로 변환합니다. Publish 연산자는 단일 구독을 여러 구독자에게 브로드캐스트하여 구독을 공유하는 메커니즘을 제공합니다. hot 는 프록시 역할을 하고 을 구독한 source다음, 에서 source값을 받으면 해당 값을 자체 구독자에게 푸시합니다. 지원 source 구독을 설정하고 값을 받기 시작하려면 IConnectableObservable.Connect() 메서드를 사용합니다. IConnectableObservable은 IObservable을 상속하므로 실행을 시작하기 전에 구독을 사용하여 이 핫 시퀀스를 구독할 수 있습니다. 이 예제에서는 핫 시퀀스를 구독할 때 subscription1 핫 시퀀스가 시작되지 않았습니다. 따라서 값이 구독자에게 푸시되지 않습니다. Connect를 호출한 후 값이 로 subscription1푸시됩니다. 3초 subscription2 의 지연 후 을 구독 hot 하고 종료될 때까지 현재 위치(이 경우 3)에서 바로 값을 수신하기 시작합니다. 출력은 다음과 같이 표시됩니다.

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();