Freigeben über


Observable.Create<TSource-Methode> (Func<IObserver<TSource>, IDisposable>)

Erstellt eine beobachtbare Sequenz aus einer Subscribe-Methodenimplementierung.

Namespace:System.Reactive.Linq
Versammlung: System.Reactive (in System.Reactive.dll)

Syntax

'Declaration
Public Shared Function Create(Of TSource) ( _
    subscribe As Func(Of IObserver(Of TSource), IDisposable) _
) As IObservable(Of TSource)
'Usage
Dim subscribe As Func(Of IObserver(Of TSource), IDisposable)
Dim returnValue As IObservable(Of TSource)

returnValue = Observable.Create(subscribe)
public static IObservable<TSource> Create<TSource>(
    Func<IObserver<TSource>, IDisposable> subscribe
)
public:
generic<typename TSource>
static IObservable<TSource>^ Create(
    Func<IObserver<TSource>^, IDisposable^>^ subscribe
)
static member Create : 
        subscribe:Func<IObserver<'TSource>, IDisposable> -> IObservable<'TSource> 
JScript does not support generic types and methods.

Typparameter

  • TSource
    Der Quellentyp.

Parameter

Rückgabewert

Typ: System.IObservable<TSource>
Die beobachtbare Sequenz mit der angegebenen Implementierung für die subscribe-Methode.

Bemerkungen

Mit dem Operator Create können Sie ihre eigenen benutzerdefinierten Sequenzen erstellen, ohne die IObservable<T-Schnittstelle> für Ihre Sequenz vollständig zu implementieren. Mit diesem Operator implementieren Sie einfach eine Subscribe-Funktion, die einen IObserver<T> akzeptiert, wobei T Ihr Typ ist und die IDisposable-Schnittstelle zurückgibt, die zum Kündigen des Abonnements verwendet wird, indem IDisposable::D ispose() aufgerufen wird. Die Verwendung dieses Operators wird gegenüber der manuellen Implementierung der IObservable<T-Schnittstelle> bevorzugt.

Beispiele

In diesem Beispiel wird ein hypothetisches Ticketsystem simuliert, bei dem eine beobachtbare Abfolge von Tickets bereitgestellt wird, ohne IObservable<Ticket> vollständig zu implementieren. Die TicketFactory-Klasse implementiert ihre eigene Subscribe-Methode namens TicketSubscribe. Diese Methode wird als subscribe-Parameter an den Create-Operator übergeben. TicketSubscribe erstellt eine fortlaufende Sequenz von Tickets in einem anderen Thread, bis das Abonnement gekündigt wird, indem die Dispose-Methode auf der IDisposable-Schnittstelle aufgerufen wird, die von TicketSubscribe zurückgegeben wird. Ein IObserver-Ticket<> wird an TicketSubscribe übergeben. Jedes Ticket in der Sequenz wird durch Aufrufen von IObserver<Ticket> übermittelt. OnNext(). Die für das Abonnement ausgeführte Beobachteraktion zeigt jedes Ticket im Konsolenfenster an.

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Reactive.Linq;

namespace Example
{

  class Program
  {

    static void Main()
    {
      IObservable<Ticket> ticketObservable = Observable.Create((Func<IObserver<Ticket>, IDisposable>)TicketFactory.TicketSubscribe);

      //*********************************************************************************//
      //*** This is a sequence of tickets. Display each ticket in the console window. ***//
      //*********************************************************************************//
      using(IDisposable handle = ticketObservable.Subscribe(ticket => Console.WriteLine(ticket.ToString())))
      {
        Console.WriteLine("\nPress ENTER to unsubscribe...\n");
        Console.ReadLine();
      }
    }
  }


  //***************************************************************************************************//
  //***                                                                                             ***//
  //*** The Ticket class just represents a hypothetical ticket composed of an ID and a timestamp.   ***//
  //***                                                                                             ***//
  //***************************************************************************************************//
  class Ticket
  {
    private readonly string ticketID;
    private readonly DateTime timeStamp;

    public Ticket(string tid)
    {
      ticketID = tid;
      timeStamp = DateTime.Now;
    }

    public override string ToString()
    {
      return String.Format("Ticket ID : {0}\nTimestamp : {1}\n", ticketID, timeStamp.ToString());
    }
  }


  //***************************************************************************************************//
  //***                                                                                             ***//
  //*** The TicketFactory class generates a new sequence of tickets on a separate thread. The       ***//
  //*** generation of the sequence is initiated by the TicketSubscribe method which will be passed  ***//
  //*** to Observable.Create().                                                                     ***//
  //***                                                                                             ***//
  //*** TicketSubscribe() provides the IDisposable interface used to dispose of the subscription    ***//
  //*** stopping ticket generation resources.                                                       ***//
  //***                                                                                             ***//
  //***************************************************************************************************//
  public class TicketFactory : IDisposable
  {
    private bool bGenerate = true;


    internal TicketFactory(object ticketObserver)
    {
      //************************************************************************//
      //*** The sequence generator for tickets will be run on another thread ***//
      //************************************************************************//
      Task.Factory.StartNew(new Action<object>(TicketGenerator), ticketObserver);
    }


    //**************************************************************************************************//
    //*** Dispose frees the ticket generating resources by allowing the TicketGenerator to complete. ***//
    //**************************************************************************************************//
    public void Dispose()
    {
      bGenerate = false;
    }


    //*****************************************************************************************************************//
    //*** TicketGenerator generates a new ticket every 3 sec and calls the observer's OnNext handler to deliver it. ***//
    //*****************************************************************************************************************//
    private void TicketGenerator(object observer)
    {
      IObserver<Ticket> ticketObserver = (IObserver<Ticket>)observer;

      //***********************************************************************************************//
      //*** Generate a new ticket every 3 sec and call the observer's OnNext handler to deliver it. ***//
      //***********************************************************************************************//
      Ticket t;

      while (bGenerate)
      {
        t = new Ticket(Guid.NewGuid().ToString());
        ticketObserver.OnNext(t);
        Thread.Sleep(3000);
      }
    }



    //********************************************************************************************************************************//
    //*** TicketSubscribe starts the flow of tickets for the ticket sequence when a subscription is created. It is passed to       ***//
    //*** Observable.Create() as the subscribe parameter. Observable.Create() returns the IObservable<Ticket> that is used to      ***//
    //*** create subscriptions by calling the IObservable<Ticket>.Subscribe() method.                                              ***//
    //***                                                                                                                          ***//
    //*** The IDisposable interface returned by TicketSubscribe is returned from the IObservable<Ticket>.Subscribe() call. Calling ***//
    //*** Dispose cancels the subscription freeing ticket generating resources.                                                    ***//
    //********************************************************************************************************************************//
    public static IDisposable TicketSubscribe(object ticketObserver)
    {
      TicketFactory tf = new TicketFactory(ticketObserver);

      return tf;
    }
  }
}

Im Folgenden finden Sie eine Beispielausgabe aus dem Beispielcode. Drücken Sie die EINGABETASTE, um die Registrierung abzubrechen.

Press ENTER to unsubscribe...

Ticket ID : a5715731-b9ba-4992-af00-d5030956cfc4
Timestamp : 5/18/2011 6:48:50 AM

Ticket ID : d9797b2b-a356-4928-bfce-797a1637b11d
Timestamp : 5/18/2011 6:48:53 AM

Ticket ID : bb01dc7d-1ed5-4ba0-9ce0-6029187792be
Timestamp : 5/18/2011 6:48:56 AM

Ticket ID : 0d3c95de-392f-4ed3-bbda-fed2c6bc7287
Timestamp : 5/18/2011 6:48:59 AM

Ticket ID : 4d19f79e-6d4f-4fec-83a8-9644a1d4e759
Timestamp : 5/18/2011 6:49:05 AM

Weitere Informationen

Verweis

Observable-Klasse

Erstellen einer Überladung

System.Reactive.Linq-Namespace