Streaming mit Orleans
In Orleans V.1.0.0 wurde dem Programmiermodell Unterstützung für Streamingerweiterungen hinzugefügt. Streamingerweiterungen bieten eine Reihe von Abstraktionen und APIs, die das Arbeiten mit Streams einfacher und stabiler machen. Mithilfe von Streamingerweiterungen können Entwickler reaktive Anwendungen erstellen, die auf strukturierte Weise mit einer Ereignissequenz arbeiten. Durch das Erweiterbarkeitsmodell von Streamanbietern ist das Programmiermodell mit zahlreichen vorhandenen Queuing-Technologien kompatibel und portierbar, z. B. Event Hubs, ServiceBus, Azure Queues und Apache Kafka. Für die Interaktion mit solchen Warteschlangen müssen Entwickler weder speziellen Code schreiben noch dedizierte Prozesse ausführen.
Warum ist das für mich wichtig?
Wenn Sie sich bereits mit der Streamverarbeitung auskennen und mit Technologien wie Event Hubs, Kafka, Azure Stream Analytics, Apache Storm, Apache Spark Streaming und reaktiven Erweiterungen (Rx) in .NET vertraut sind, fragen Sie sich vielleicht, warum dieses Thema für Sie wichtig ist. Warum benötigen wir ein weiteres System für die Streamverarbeitung und welcher Zusammenhang besteht zwischen Akteuren und Streams?Der Artikel „Was spricht für Orleans Streams?“ soll diese Frage beantworten.
Programmiermodell
Das Orleans Streams-Programmiermodell basiert auf mehreren Prinzipien:
- Orleans-Streams sind virtuell. Das bedeutet, dass ein Stream immer vorhanden ist. Er wird nicht explizit erstellt oder zerstört und kann nie fehlschlagen.
- Streams werden durch Stream-IDs gekennzeichnet. Dies sind logische Namen, die aus GUIDs und Zeichenfolgen bestehen.
- Mit Orleans Streams können Sie die Generierung von Daten sowohl zeitlich als auch räumlich von der Verarbeitung entkoppeln. Dies bedeutet, dass sich der Ersteller und der Consumer des Streams auf unterschiedlichen Servern oder in verschiedenen Zeitzonen befinden können und widerstandsfähig gegenüber Fehlern sind.
- Orleans-Streams sind schlank und dynamisch. Orleans Streaming Runtime kann zahlreiche Streams verarbeiten, die mit hoher Geschwindigkeit eingehen und ausgehen.
- Orleans-Stream-Bindungen sind dynamisch. Orleans Streaming Runtime unterstützt Szenarien, in denen Grains mit hoher Geschwindigkeit mit Streams verbunden und wieder getrennt werden.
- Orleans Streaming Runtime verwaltet den Lebenszyklus der Streamnutzung auf transparente Weise. Nachdem eine Anwendung einen Stream abonniert hat, empfängt sie die Ereignisse des Streams, auch wenn Fehler auftreten.
- Orleans-Streams funktionieren einheitlich über Grains und Orleans-Clients hinweg.
Programmieren von APIs
Anwendungen interagieren über Orleans.Streams.IAsyncStream<T> mit Streams, wodurch die Orleans.Streams.IAsyncObserver<T>- und Orleans.Streams.IAsyncObservable<T>-Schnittstellen implementiert werden. Diese APIs ähneln den bekannten reaktiven Erweiterungen (Rx) in .NET.
Unten wird ein typisches Beispiel gezeigt, in dem ein Gerät Daten generiert, die als HTTP-Anforderung an den in der Cloud ausgeführten Dienst gesendet werden. Der Orleans-Client, der auf dem Front-End-Server ausgeführt wird, empfängt diesen HTTP-Aufruf und veröffentlicht die Daten in einem passenden Gerätestream:
public async Task OnHttpCall(DeviceEvent deviceEvent)
{
// Post data directly into the device's stream.
IStreamProvider streamProvider =
GrainClient.GetStreamProvider("MyStreamProvider");
IAsyncStream<DeviceEventData> deviceStream =
streamProvider.GetStream<DeviceEventData>(
deviceEvent.DeviceId, "MyNamespace");
await deviceStream.OnNextAsync(deviceEvent.Data);
}
In einem weiteren unten gezeigten Beispiel tritt ein Chatbenutzer (implementiert als Orleans Grain) einem Chatroom bei, ruft einen Handle für einen Stream mit Chatnachrichten ab, die von allen anderen Benutzern in diesem Raum generiert werden, und abonniert ihn. Beachten Sie, dass der Chatbenutzer nicht über das Chatroom-Grain selbst informiert werden muss (möglicherweise gibt es kein solches Grain in unserem System). Er muss auch nicht über andere Benutzer in dieser Gruppe Bescheid wissen, die Nachrichten erstellen. Zum Veröffentlichen von Inhalt im Chatstream müssen Benutzer selbstverständlich nicht wissen, wer den Stream gerade abonniert hat. Dies zeigt, wie Chatbenutzer zeitlich und räumlich vollständig entkoppelt sein können.
public class ChatUser: Grain
{
public async Task JoinChat(Guid chatGroupId)
{
IStreamProvider streamProvider =
base.GetStreamProvider("MyStreamProvider");
IAsyncStream<string> chatStream =
streamProvider.GetStream<string>(chatGroupId, "MyNamespace");
await chatStream.SubscribeAsync(
async (message, token) => Console.WriteLine(message))
}
}
Schnellstartbeispiel
Das Schnellstartbeispiel bietet eine schnelle Übersicht über den gesamten Workflow bei der Nutzung von Streams in der Anwendung. Nachdem Sie das Beispiel gelesen haben, sollten Sie die Informationen zu den Streams-Programmier-APIs lesen, um ein detailliertes Verständnis der Konzepte zu erlangen.
Streams-Programmier-APIs
Die Informationen unter Streams-Programmier-APIs bieten eine ausführliche Beschreibung der Programmier-APIs.
Streamanbieter
Streams können über physische Kanäle in verschiedenen Formen eingehen und eine unterschiedliche Semantik aufweisen. Orleans Streaming unterstützt diese Vielfalt über das Konzept der Streamanbieter, bei dem es sich um einen Erweiterungspunkt im System handelt. Orleans verfügt derzeit über Implementierungen von zwei Streamanbietern: Simple Message-Streamanbieter auf Basis von TCP und Azure Queue-Streamanbieter auf Basis von Azure Queue. Weitere Informationen zu Streamanbietern finden Sie unter Streamanbieter.
Stream-Semantik
Semantik für Streamabonnements:
Orleans-Streams garantieren die sequenzielle Konsistenz für Streamabonnementvorgänge. Wenn ein Consumer einen Stream abonniert und die Task
für den Abonnementvorgang erfolgreich aufgelöst wurde, sieht der Consumer alle Ereignisse, die nach dem Abonnieren generiert wurden. Darüber hinaus können Sie mithilfe von Rücklauf-Streams einen Stream von einem beliebigen Zeitpunkt in der Vergangenheit aus abonnieren, indem Sie StreamSequenceToken verwenden. Weitere Informationen finden Sie unter Orleans-Streamanbieter.
Garantien für die Zustellung einzelner Streamereignisse:
Garantien für die Zustellung von individuellen Ereignissen richten sich nach dem jeweiligen Streamanbieter. Einige bieten nur eine höchstens einmalige Zustellung nach dem Best-Effort-Prinzip (wie Simple Message Streams, SMS), während andere mindestens eine einmalige Zustellung bieten (wie Azure Queue Streams). Es ist sogar möglich, einen Streaminganbieter zu erstellen, der genau eine Zustellung garantiert (wir haben noch keinen solchen Anbieter, aber es ist möglich, einen zu erstellen).
Reihenfolge für die Zustellung von Ereignissen:
Auch die Ereignisreihenfolge hängt vom jeweiligen Streamanbieter ab. Bei SMS-Streams steuert der Ersteller über die Art der Veröffentlichung explizit, in welcher Reihenfolge der Consumer die Ereignisse sieht. Azure Queue-Streams garantieren keine FIFO-Reihenfolge, da die zugrunde liegenden Azure-Warteschlangen die Reihenfolge im Fehlerfall nicht garantieren. Anwendungen können die Reihenfolge der Streamzustellung auch mithilfe von StreamSequenceToken
steuern.
Implementierung von Streams
Die Orleans Streams-Implementierung bietet eine allgemeine Übersicht über die interne Implementierung.
Codebeispiele
Weitere Beispiele für die Verwendung von Streaming-APIs in einem Grain finden Sie hier. Weitere Beispiele sind für die Zukunft geplant.