Freigeben über


Abfragen beobachtbarer Sequenzen mithilfe von LINQ-Operatoren

In Bridging mit vorhandenen .NET-Ereignissen haben wir vorhandene .NET-Ereignisse in beobachtbare Sequenzen konvertiert, um sie zu abonnieren. In diesem Thema betrachten wir die erstklassige Natur beobachtbarer Sequenzen als IObservable<T-Objekte> , in denen generische LINQ-Operatoren von den Rx-Assemblys bereitgestellt werden, um diese Objekte zu bearbeiten. Die meisten Operatoren verwenden eine beobachtbare Sequenz, führen eine Logik darauf aus und geben eine weitere beobachtbare Sequenz aus. Darüber hinaus können Sie, wie Sie in unseren Codebeispielen sehen, sogar mehrere Operatoren in einer Quellsequenz verketten, um die resultierende Sequenz an Ihre genauen Anforderungen zu optimieren.

Verwenden verschiedener Operatoren

In den vorherigen Themen haben wir bereits die Operatoren Create und Generate verwendet, um einfache Sequenzen zu erstellen und zurückzugeben. Außerdem haben wir den Operator FromEventPattern verwendet, um vorhandene .NET-Ereignisse in beobachtbare Sequenzen zu konvertieren. In diesem Thema verwenden wir andere statische LINQ-Operatoren des Observable-Typs, damit Sie Daten filtern, gruppieren und transformieren können. Solche Operatoren verwenden beobachtbare Sequenzen als Eingabe und erzeugen beobachtbare Sequenzen als Ausgabe.

Kombinieren verschiedener Sequenzen

In diesem Abschnitt untersuchen wir einige der Operatoren, die verschiedene beobachtbare Sequenzen in einer einzelnen beobachtbaren Sequenz kombinieren. Beachten Sie, dass Daten beim Kombinieren von Sequenzen nicht transformiert werden.

Im folgenden Beispiel verwenden wir den Concat-Operator, um zwei Sequenzen in einer einzelnen Sequenz zu kombinieren und zu abonnieren. Zur Veranschaulichung verwenden wir den sehr einfachen Range(x, y)-Operator, um eine Sequenz von ganzen Zahlen zu erstellen, die mit x beginnt und anschließend y sequenzielle Zahlen erzeugt.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Beachten Sie, dass die resultierende Sequenz ist 1,2,3,1,2,3. Dies liegt daran, dass bei Verwendung des Concat-Operators die 2. Sequenz (source2) erst aktiv ist, nachdem die 1. Sequenz (source1) alle ihre Werte gepusht hat. Erst nachdem source1 abgeschlossen ist, beginnt das Pushen von source2 Werten in die resultierende Sequenz. Der Abonnent ruft dann alle Werte aus der resultierenden Sequenz ab.

Vergleichen Sie dies mit dem Merge-Operator. Wenn Sie den folgenden Beispielcode ausführen, erhalten 1,1,2,2,3,3Sie . Dies liegt daran, dass die beiden Sequenzen gleichzeitig aktiv sind, und die Werte werden herausgepusht, wenn sie in den Quellen auftreten. Die resultierende Sequenz wird nur abgeschlossen, wenn die letzte Quellsequenz das Pushen von Werten abgeschlossen hat.

Beachten Sie, dass alle beobachtbaren Quellsequenzen vom gleichen Typ von IObservable<T> sein müssen, damit merge funktioniert. Die resultierende Sequenz ist vom Typ IObservable<T>. Wenn source1 in der Mitte der Sequenz ein OnError erzeugt wird, wird die resultierende Sequenz sofort abgeschlossen.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Ein weiterer Vergleich kann mit dem Catch-Operator durchgeführt werden. Wenn in diesem Fall source1 ohne Fehler abgeschlossen wird, wird nicht source2 gestartet. Wenn Sie den folgenden Beispielcode ausführen, erhalten 1,2,3 Sie daher nur, da source2 (was erzeugt 4,5,6) ignoriert wird.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Sehen wir uns abschließend OnErrorResumeNext an. Dieser Operator wechselt auch dann zu source2 , wenn source1 aufgrund eines Fehlers nicht abgeschlossen werden kann. Im folgenden Beispiel stellt der Abonnent zwar source1 eine Sequenz dar, die mit einer Ausnahme beendet wird (mithilfe des Throw-Operators), erhält jedoch die von veröffentlichten source2Werte (1,2,3). Wenn Sie also erwarten, dass eine Quellsequenz einen Fehler erzeugt, ist es sicherer, OnErrorResumeNext zu verwenden, um sicherzustellen, dass der Abonnent weiterhin einige Werte erhält.

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Beachten Sie, dass alle beobachtbaren Sequenzen vom gleichen T-Typ sein müssen, damit alle diese Kombinationsoperatoren funktionieren.

Projektion

Der Select-Operator kann jedes Element einer beobachtbaren Sequenz in eine andere Form übersetzen.

Im folgenden Beispiel projizieren wir eine Sequenz von ganzen Zahlen in Zeichenfolgen der Länge n.

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

Im folgenden Beispiel, das eine Erweiterung des Beispiels für die .NET-Ereigniskonvertierung ist, das wir im Thema Bridging with Existing .NET Events gesehen haben, verwenden wir den Select-Operator, um den IEventPattern<MouseEventArgs-Datentyp in einen Point-Typ zu projizieren>. Auf diese Weise transformieren wir eine Mausverschiebungsereignissequenz in einen Datentyp, der weiter analysiert und bearbeitet werden kann, wie im nächsten Abschnitt "Filterung" zu sehen ist.

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Sehen wir uns abschließend den SelectMany-Operator an. Der SelectMany-Operator verfügt über viele Überladungen, von denen eine ein Selektorfunktionsargument verwendet. Diese Selektorfunktion wird für jeden Wert aufgerufen, der von der beobachtbaren Quelle gepusht wird. Für jeden dieser Werte projiziert der Selektor ihn in eine mini beobachtbare Sequenz. Am Ende vereinfacht der SelectMany-Operator alle diese Minisequenzen in eine einzelne resultierende Sequenz, die dann an den Abonnenten gepusht wird.

Das von SelectMany zurückgegebene Beobachtbare veröffentlicht OnCompleted, nachdem die Quellsequenz und alle vom Selektor erzeugten mini-beobachtbaren Sequenzen abgeschlossen wurden. OnError wird ausgelöst, wenn im Quelldatenstrom ein Fehler aufgetreten ist, wenn eine Ausnahme von der Selektorfunktion ausgelöst wurde oder wenn in einer der mini beobachtbaren Sequenzen ein Fehler aufgetreten ist.

Im folgenden Beispiel erstellen wir zunächst eine Quellsequenz, die alle 5 Sekunden eine ganze Zahl erzeugt, und entscheiden uns, nur die ersten 2 erzeugten Werte zu übernehmen (mithilfe des Take-Operators). Anschließend wird verwendet SelectMany , um jede dieser ganzen Zahlen mit einer anderen Sequenz von zu projizieren {100, 101, 102}. Auf diese Weise werden zwei mini beobachtbare Sequenzen und erzeugt {100, 101, 102}{100, 101, 102}. Diese werden schließlich in einen einzelnen Stream von ganzen Zahlen von {100, 101, 102, 100, 101, 102} vereinfacht und an den Beobachter gepusht.

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

Filterung

Im folgenden Beispiel verwenden wir den Generate-Operator, um eine einfache beobachtbare Sequenz von Zahlen zu erstellen. Der Generate-Operator verfügt über mehrere Überladungen. In unserem Beispiel wird ein Anfangszustand (in unserem Beispiel 0), eine bedingte Funktion zum Beenden (weniger als 10 Mal), ein Iterator (+1) und eine Ergebnisauswahl (eine quadratische Funktion des aktuellen Werts) verwendet. , und drucken nur solche aus, die kleiner als 15 sind, indem Sie die Operatoren Where und Select verwenden.

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

Das folgende Beispiel ist eine Erweiterung des Projektionsbeispiels, das Sie weiter oben in diesem Thema gesehen haben. In diesem Beispiel haben wir den Select-Operator verwendet, um den IEventPattern<MouseEventArgs-Datentyp> in einen Point-Typ zu projizieren. Im folgenden Beispiel verwenden wir den Operator Where und Select, um nur die mausbewegungen auszuwählen, die uns interessieren. In diesem Fall filtern wir die Mausbewegungen zu den Mausbewegungen über dem ersten Bisektor (wobei die x- und y-Koordinaten gleich sind).

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Zeitbasierter Vorgang

Sie können die Pufferoperatoren verwenden, um zeitbasierte Vorgänge auszuführen.

Das Puffern einer beobachtbaren Sequenz bedeutet, dass die Werte einer beobachtbaren Sequenz basierend auf einem angegebenen Zeitraum oder einem Schwellenwert für die Anzahl in einen Puffer eingefügt werden. Dies ist besonders hilfreich in Situationen, in denen Sie erwarten, dass eine enorme Menge von Daten von der Sequenz gepusht wird und der Abonnent nicht über die Ressource zum Verarbeiten dieser Werte verfügt. Durch Puffern der Ergebnisse basierend auf Zeit oder Anzahl und die Rückgabe einer Sequenz von Werten, wenn die Kriterien überschritten werden (oder wenn die Quellsequenz abgeschlossen wurde), kann der Abonnent OnNext-Aufrufe in seinem eigenen Tempo verarbeiten. 

Im folgenden Beispiel erstellen wir zunächst eine einfache Sequenz von ganzen Zahlen für jede Sekunde. Anschließend verwenden wir den Pufferoperator und geben an, dass jeder Puffer 5 Elemente aus der Sequenz enthält. OnNext wird aufgerufen, wenn der Puffer voll ist. Anschließend wird die Summe des Puffers mithilfe des Sum-Operators ausgewertet. Der Puffer wird automatisch geleert, und ein weiterer Zyklus beginnt. Der Ausdruck ist 10, 35, 60… 10=0+1+2+3+4, 35=5+6+7+8+9 usw.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

Wir können auch einen Puffer mit einem angegebenen Zeitraum erstellen. Im folgenden Beispiel enthält der Puffer Elemente, die sich 3 Sekunden lang angesammelt haben. Der Ausdruck wird 3, 12, 21... in wobei 3=0+1+2, 12=3+4+5 usw.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

Beachten Sie, dass Sie bei Verwendung von Puffer oder Fenster sicherstellen müssen, dass die Sequenz nicht leer ist, bevor Sie danach filtern.

LINQ-Operatoren nach Kategorien

Das Thema LINQ-Operatoren nach Kategorien listet alle wichtigen LINQ-Operatoren auf, die vom Observable-Typ nach ihren Kategorien implementiert werden. insbesondere: Erstellung, Konvertierung, Kombinieren, funktional, mathematisch, Zeit, Ausnahmen, Verschiedenes, Auswahl und Grundtypen.

Weitere Informationen

Verweis

Observable

Konzepte

LINQ-Operatoren nach Kategorien