Condividi tramite


Creazione e sottoscrizione di sequenze osservabili semplici

Non è necessario implementare manualmente l'interfaccia IObservable<T> per creare sequenze osservabili. Analogamente, non è necessario implementare IObserver<T> per sottoscrivere una sequenza. Installando gli assembly dell'estensione reattiva, è possibile sfruttare il tipo Observable che fornisce molti operatori LINQ statici per creare una sequenza semplice con zero, uno o più elementi. Rx fornisce inoltre metodi di estensione Subscribe che accettano varie combinazioni di gestori OnNext, OnError e OnCompleted in termini di delegati.

Creazione e sottoscrizione di una sequenza semplice

Nell'esempio seguente viene utilizzato l'operatore Range del tipo Observable per creare una semplice raccolta osservabile di numeri. L'osservatore sottoscrive questa raccolta usando il metodo Subscribe della classe Observable e fornisce azioni che gestiscono OnNext, OnError e OnCompleted.

L'operatore Range ha diversi overload. In questo esempio viene creata una sequenza di interi che inizia con x e produce numeri sequenziali y in un secondo momento. 

Non appena avviene la sottoscrizione, i valori vengono inviati all'osservatore. Il delegato OnNext stampa quindi i valori.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}

Quando un osservatore sottoscrive una sequenza osservabile, il thread che chiama il metodo Subscribe può essere diverso dal thread in cui la sequenza viene eseguita fino al completamento. Pertanto, la chiamata Subscribe è asincrona in quanto il chiamante non viene bloccato fino al completamento dell'osservazione della sequenza. Questo argomento verrà trattato in dettaglio nell'argomento Uso delle utilità di pianificazione .

Si noti che il metodo Subscribe restituisce un IDisposable, in modo che sia possibile annullare la sottoscrizione a una sequenza ed eliminarlo facilmente. Quando si richiama il metodo Dispose nella sequenza osservabile, l'osservatore smetterà di ascoltare l'osservabile per i dati.  In genere, non è necessario chiamare in modo esplicito Dispose a meno che non sia necessario annullare la sottoscrizione anticipata o quando la sequenza osservabile di origine ha un intervallo di vita più lungo rispetto all'osservatore. Le sottoscrizioni in Rx sono progettate per scenari fire-and-forget senza l'uso di un finalizzatore. Quando l'istanza IDisposable viene raccolta dal Garbage Collector, Rx non elimina automaticamente la sottoscrizione. Si noti tuttavia che il comportamento predefinito degli operatori Observable consiste nell'eliminare la sottoscrizione il prima possibile, ad esempio quando vengono pubblicati messaggi OnCompleted o OnError. Ad esempio, il codice var x = Observable.Zip(a,b).Subscribe(); sottoscriverà x a entrambe le sequenze a e b. Se viene generato un errore, x verrà annullata immediatamente la sottoscrizione a b.

È anche possibile modificare l'esempio di codice per usare l'operatore Create del tipo Observable , che crea e restituisce un osservatore dai delegati di azione OnNext, OnError e OnCompleted specificati. È quindi possibile passare questo osservatore al metodo Subscribe del tipo Observable . L'esempio seguente illustra come eseguire questa operazione.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}

Oltre a creare una sequenza osservabile da zero, è possibile convertire gli enumeratori esistenti, gli eventi .NET e i modelli asincroni in sequenze osservabili. Gli altri argomenti di questa sezione illustrano come eseguire questa operazione.

Si noti che in questo argomento vengono illustrati solo alcuni operatori che possono creare una sequenza osservabile da zero. Per altre informazioni sugli altri operatori LINQ, vedere Esecuzione di query sulle sequenze osservabili tramite operatori LINQ.

Uso di un timer

Nell'esempio seguente viene utilizzato l'operatore Timer per creare una sequenza. La sequenza eseguirà il push del primo valore dopo la scadenza di 5 secondi, quindi eseguirà il push dei valori successivi ogni 1 secondo. A scopo illustrativo, concateniamo l'operatore Timestamp alla query in modo che ogni valore inserito venga aggiunto al momento della pubblicazione. In questo modo, quando si sottoscrive questa sequenza di origine, è possibile ricevere sia il valore che il timestamp.

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

L'output sarà simile al seguente:

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

Usando l'operatore Timestamp, è stato verificato che il primo elemento viene effettivamente eseguito il push di 5 secondi dopo l'avvio della sequenza e ogni elemento viene pubblicato 1 secondo in seguito.

Conversione di una raccolta enumerabile in una sequenza osservabile

Usando l'operatore ToObservable, è possibile convertire una raccolta enumerabile generica in una sequenza osservabile e sottoscriverla.

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();

Cold vs. Hot Observables

Gli osservabili a freddo iniziano a essere eseguiti durante la sottoscrizione, ovvero la sequenza osservabile inizia a eseguire il push dei valori solo agli osservatori quando viene chiamato Subscribe. I valori non vengono condivisi tra sottoscrittori. Questo è diverso dagli osservabili ad accesso frequente, ad esempio gli eventi di spostamento del mouse o i ticker azionari, che stanno già producendo valori anche prima che una sottoscrizione sia attiva. Quando un osservatore sottoscrive una sequenza osservabile ad accesso frequente, ottiene il valore corrente nel flusso. La sequenza osservabile ad accesso frequente viene condivisa tra tutti i sottoscrittori e viene eseguito il push del valore successivo nella sequenza. Ad esempio, anche se nessuno ha sottoscritto una determinata ticker azionaria, il ticker continuerà ad aggiornarne il valore in base al movimento di mercato. Quando un sottoscrittore registra l'interesse in questo ticker, otterrà automaticamente il segno di spunta più recente.

Nell'esempio seguente viene illustrata una sequenza osservabile a freddo. In questo esempio viene usato l'operatore Interval per creare una semplice sequenza osservabile di numeri pompati a intervalli specifici, in questo caso ogni 1 secondo.

Due osservatori sottoscrivono quindi questa sequenza e ne stampano i valori. Si noterà che la sequenza viene reimpostata per ogni sottoscrittore, in cui la seconda sottoscrizione riavvia la sequenza dal primo valore.

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   

IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

Nell'esempio seguente la sequenza osservabile a freddo precedente viene convertita in una sequenza source ad accesso frequente usando l'operatore Publish, che restituisce un'istanza IConnectableObservable denominata hot. L'operatore Publish fornisce un meccanismo per condividere le sottoscrizioni trasmettendo una singola sottoscrizione a più sottoscrittori. hot funge da proxy e sottoscrive source, quindi quando riceve i valori da source, li inserisce nei propri sottoscrittori. Per stabilire una sottoscrizione al backup source e iniziare a ricevere i valori, viene usato il metodo IConnectableObservable.Connect(). Poiché IConnectableObservable eredita IObservable, è possibile usare Subscribe per sottoscrivere questa sequenza ad accesso frequente anche prima dell'avvio dell'esecuzione. Si noti che nell'esempio la sequenza ad accesso frequente non è stata avviata quando subscription1 la sottoscrive. Di conseguenza, non viene eseguito il push di alcun valore nel sottoscrittore. Dopo aver chiamato Connect, i valori vengono quindi inseriti in subscription1. Dopo un ritardo di 3 secondi, subscription2 sottoscrive e inizia a hot ricevere i valori immediatamente dalla posizione corrente (3 in questo caso) fino alla fine. L'output sarà simile al seguente:

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();