Freigeben über


Schnellstart für Orleans-Streaming

Dieser Leitfaden zeigt Ihnen eine schnelle Möglichkeit zum Einrichten und Verwenden von Orleans-Streams. Weitere Informationen zu den Streamingfeatures finden Sie in anderen Teilen dieser Dokumentation.

Erforderliche Konfigurationen

In diesem Leitfaden verwenden Sie einen speicherbasierten Stream, der Grain Messaging verwendet, um Streamdaten an Abonnenten zu senden. Sie verwenden den Speicheranbieter im Arbeitsspeicher, um Listen von Abonnements zu speichern. Die Verwendung speicherbasierter Mechanismen für Streaming und Speicher ist nur für lokale Entwicklung und Tests vorgesehen und nicht für Produktionsumgebungen vorgesehen.

Rufen Sie im Silo , wobei silo ein ISiloBuilderist, folgendes auf AddMemoryStreams:

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

Rufen Sie auf dem Clusterclient , wobei client ein IClientBuilderist, auf AddMemoryStreams.

client.AddMemoryStreams("StreamProvider");

In diesem Leitfaden verwenden Sie einen speicherbasierten Stream, der Grain Messaging verwendet, um Streamdaten an Abonnenten zu senden. Wir verwenden den In-Memory-Speicheranbieter zum Speichern von Abonnementlisten, daher ist dies keine sinnvolle Wahl für echte Produktionsanwendungen.

Rufen Sie im Silo , wobei hostBuilder ein ISiloHostBuilderist, folgendes auf AddSimpleMessageStreamProvider:

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

Rufen Sie auf dem Clusterclient , wobei clientBuilder ein IClientBuilderist, auf AddSimpleMessageStreamProvider.

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

Hinweis

Standardmäßig gelten Nachrichten, die über den Einfachen Nachrichtenstream übergeben werden, als unveränderlich und können durch Verweis auf andere Grains übergeben werden. Um dieses Verhalten zu deaktivieren, müssen Sie den SMS-Anbieter so konfigurieren, dass er deaktiviert wird. SimpleMessageStreamProviderOptions.OptimizeForImmutableData

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

Sie können Streams erstellen, Daten mit ihnen als Produzenten senden und auch Daten als Abonnenten empfangen.

Ereignisse erstellen

Es ist relativ einfach, Ereignisse für Streams zu erzeugen. Sie sollten zuerst Zugriff auf den Streamanbieter erhalten, den Sie zuvor in der Konfiguration ("StreamProvider") definiert haben, und dann einen Stream auswählen und Daten per Push an diesen übertragen.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

Es ist relativ einfach, Ereignisse für Streams zu erzeugen. Sie sollten zuerst Zugriff auf den Streamanbieter erhalten, den Sie zuvor in der Konfiguration ("SMSProvider") definiert haben, und dann einen Stream auswählen und Daten per Push an diesen übertragen.

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

Wie Sie sehen, verfügt unser Stream über eine GUID und einen Namespace. Dies erleichtert die Identifizierung eindeutiger Datenströme. Beispielsweise kann der Namespace für einen Chatroom „Räume" und die GUID die GUID sein, die die GUID von RoomGrain besitzt.

Hier verwenden wir die GUID eines bekannten Chatrooms. Mit der OnNextAsync Methode des Streams können wir Daten per Push an ihn übertragen. Lassen Sie uns dies in einem Timer mithilfe von Zufallszahlen tun. Sie können auch jeden anderen Datentyp für den Stream verwenden.

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

Abonnieren und Empfangen von Streamingdaten

Für den Empfang von Daten können Sie implizite und explizite Abonnements verwenden, die in Explizite und implizite Abonnements ausführlicher beschrieben werden. In diesem Beispiel werden implizite Abonnements verwendet, was einfacher ist. Wenn ein Grain-Typ implizit einen Stream abonnieren möchte, verwendet er das Attribut [ImplicitStreamSubscription(namespace)].

Definieren Sie für Ihren Fall folgendes ReceiverGrain :

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

Wenn Daten wie im Timer an die Datenströme des Namespace RANDOMDATAgepusht werden, empfängt ein Körnchen des Typs ReceiverGrain mit demselben Guid Datenstrom die Nachricht. Auch wenn derzeit keine Aktivierungen des Grains vorhanden sind, erstellt die Runtime automatisch eine neue und sendet die Nachricht an ihn.

Damit dies funktioniert, müssen wir den Abonnementprozess abschließen, indem wir unsere OnNextAsync Methode für den Empfang von Daten festlegen. Um dies zu tun, sollten wir ReceiverGrain so etwas in seiner OnActivateAsync

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

Alles erledigt! Jetzt ist die einzige Anforderung, dass etwas die Erstellung des Erzeugerkorns auslöst, und dann registriert es den Timer und beginnt, zufällige Ints an alle interessierten Parteien zu senden.

Auch hier überspringt dieser Leitfaden viele Details und ist nur gut, um das große Ganze zu zeigen. Lesen Sie andere Teile dieses Handbuchs und andere Ressourcen auf RX, um ein gutes Verständnis dafür zu erhalten, was verfügbar ist und wie.

Reaktive Programmierung kann ein sehr leistungsstarker Ansatz sein, um viele Probleme zu lösen. Sie könnten beispielsweise LINQ im Abonnenten verwenden, um Zahlen zu filtern und alle möglichen interessanten Dinge zu erledigen.

Siehe auch

Orleans-Streams-Programmier-APIs