Przeczytaj w języku angielskim

Udostępnij za pośrednictwem


Przesyłanie strumieniowe z Orleanem

Orlean v.1.0.0 dodał obsługę rozszerzeń przesyłania strumieniowego do modelu programowania. Rozszerzenia przesyłania strumieniowego zapewniają zestaw abstrakcji i interfejsów API, które sprawiają, że myślenie o strumieniach i praca z nimi jest prostsze i bardziej niezawodne. Rozszerzenia przesyłania strumieniowego umożliwiają deweloperom pisanie reaktywnych aplikacji działających na sekwencji zdarzeń w sposób ustrukturyzowany. Model rozszerzalności dostawców strumieni sprawia, że model programowania jest zgodny z i przenośnym w wielu istniejących technologiach kolejkowania, takich jak Event Hubs, ServiceBus, Azure Queues i Apache Kafka. Nie ma potrzeby pisania specjalnego kodu ani uruchamiania dedykowanych procesów w celu interakcji z takimi kolejkami.

Dlaczego powinienem się obchodzić?

Jeśli znasz już wszystkie informacje o przetwarzaniu strumienia i znasz technologie, takie jak Event Hubs, Kafka, Azure Stream Analytics, Apache Storm, Przesyłanie strumieniowe platformy Apache Spark i reaktywne rozszerzenia (Rx) na platformie .NET, możesz zadać pytanie, dlaczego warto dbać. Dlaczego potrzebujemy jeszcze innego systemu przetwarzania strumieniowego i sposobu, w jaki aktorzy są związani z Strumienie?"Dlaczego Orlean Strumienie?" ma odpowiedzieć na to pytanie.

Model programowania

Istnieje kilka zasad dotyczących modelu programowania orleańskiego Strumienie:

  1. Strumienie Orleanu są wirtualne. Oznacza to, że strumień zawsze istnieje. Nie jest jawnie tworzony lub niszczony i nigdy nie może zakończyć się niepowodzeniem.
  2. Strumienie są identyfikowane przez identyfikatory strumienia, które są po prostu nazwami logicznymi składającymi się z identyfikatorów GUID i ciągów.
  3. Orlean Strumienie umożliwia oddzielenie generowania danych od ich przetwarzania, zarówno w czasie, jak i w przestrzeni. Oznacza to, że producent strumienia i odbiorca strumienia mogą znajdować się na różnych serwerach lub w różnych strefach czasowych i wytrzyma awarie.
  4. Strumienie Orleanu są lekkie i dynamiczne. Orleans Streaming Runtime jest przeznaczony do obsługi dużej liczby strumieni, które przychodzą i idą z dużą szybkością.
  5. Powiązania strumienia orleańskiego są dynamiczne. Środowisko uruchomieniowe przesyłania strumieniowego Orleanu jest przeznaczone do obsługi przypadków, w których ziarna łączą się z strumieniami i odłączają je od strumieni o wysokiej szybkości.
  6. Środowisko uruchomieniowe przesyłania strumieniowego Orleanu w sposób niewidoczny zarządza cyklem życia zużycia strumienia. Po zasubskrybowaniu strumienia aplikacja otrzyma zdarzenia strumienia, nawet w przypadku awarii.
  7. Strumienie Orleanu działają jednolicie w ziarnach i klientach Orleanu.

Interfejsy API programowania

Aplikacje wchodzą w interakcje ze strumieniami przy użyciu metody Orleans.Streams.IAsyncStream<T>, która implementuje Orleans.Streams.IAsyncObserver<T> interfejsy i Orleans.Streams.IAsyncObservable<T> . Te interfejsy API są podobne do dobrze znanych reaktywnych rozszerzeń (Rx) na platformie .NET.

W typowym przykładzie poniżej urządzenie generuje pewne dane, które są wysyłane jako żądanie HTTP do usługi uruchomionej w chmurze. Klient Orleanu uruchomiony na serwerze frontonu odbiera to wywołanie HTTP i publikuje dane w pasującym strumieniu urządzenia:

C#
public async Task OnHttpCall(DeviceEvent deviceEvent)
{
     // Post data directly into the device's stream.
     IStreamProvider streamProvider =
        GrainClient.GetStreamProvider("MyStreamProvider");

    IAsyncStream<DeviceEventData> deviceStream =
        streamProvider.GetStream<DeviceEventData>(
            deviceEvent.DeviceId, "MyNamespace");

     await deviceStream.OnNextAsync(deviceEvent.Data);
}

W innym przykładzie poniżej użytkownik czatu (zaimplementowany jako Orleans Grain) dołącza do pokoju rozmów, dostaje uchwyt do strumienia wiadomości czatu generowanych przez wszystkich innych użytkowników w tym pokoju i subskrybuje go. Zwróć uwagę, że użytkownik czatu nie musi wiedzieć o samym ziarnie pokoju rozmów (może nie być takiego ziarna w naszym systemie) ani o innych użytkownikach w tej grupie, którzy generują wiadomości. Nie trzeba powiedzieć, aby opublikować strumień czatu, użytkownicy nie muszą wiedzieć, kto jest obecnie subskrybowany do strumienia. Pokazuje to, jak użytkownicy czatów mogą być całkowicie oddzielone w czasie i przestrzeni.

C#
public class ChatUser: Grain
{
    public async Task JoinChat(Guid chatGroupId)
    {
        IStreamProvider streamProvider =
            base.GetStreamProvider("MyStreamProvider");

        IAsyncStream<string> chatStream =
            streamProvider.GetStream<string>(chatGroupId, "MyNamespace");

        await chatStream.SubscribeAsync(
            async (message, token) => Console.WriteLine(message))
    }
}

Przykład Szybkiego startu

Przykład Szybki start to dobry szybki przegląd ogólnego przepływu pracy korzystania ze strumieni w aplikacji. Po przeczytaniu należy przeczytać interfejsy API programowania Strumienie, aby lepiej zrozumieć pojęcia.

interfejsy API programowania Strumienie

Interfejsy API programowania Strumienie zawiera szczegółowy opis interfejsów API programowania.

Dostawcy usługi Stream

Strumienie mogą pochodzić za pośrednictwem kanałów fizycznych różnych kształtów i formularzy i mogą mieć różne semantyki. Orleans Streaming jest przeznaczony do wspierania tej różnorodności za pośrednictwem koncepcji dostawców strumienia, który jest punktem rozszerzalności w systemie. Orlean ma obecnie implementacje dwóch dostawców strumieni: oparty na protokole TCP dostawca prostego strumienia komunikatów i dostawca usługi Azure Queue Stream oparty na kolejce platformy Azure. Więcej szczegółów na temat dostawców usługi Stream można znaleźć w artykule Dostawcy usługi Stream.

Semantyka strumienia

Semantyka subskrypcji strumienia:

Orlean Strumienie zagwarantować sekwencyjną spójność dla operacji subskrypcji usługi Stream. W szczególności, gdy użytkownik subskrybuje strumień, po Task pomyślnym rozwiązaniu operacji reprezentowania subskrypcji użytkownik zobaczy wszystkie zdarzenia wygenerowane po jego zasubskrybowaniu. Ponadto strumienie z możliwością przewijania umożliwiają subskrybowanie dowolnego punktu w czasie w przeszłości przy użyciu polecenia StreamSequenceToken. Aby uzyskać więcej informacji, zobacz Dostawcy strumieni Orleanu.

Gwarancje dostarczania zdarzeń pojedynczego strumienia:

Indywidualne gwarancje dostarczania zdarzeń zależą od poszczególnych dostawców strumienia. Niektóre zapewniają tylko najlepsze wysiłki podczas dostarczania maksymalnie raz (na przykład proste komunikaty Strumienie (SMS)), podczas gdy inne zapewniają co najmniej jednokrotne dostarczanie (np. usługa Azure Queue Strumienie). Istnieje nawet możliwość utworzenia dostawcy przesyłania strumieniowego, który gwarantuje dokładnie jednokrotne dostarczanie (nie mamy jeszcze takiego dostawcy, ale istnieje możliwość utworzenia takiego dostawcy).

Kolejność dostarczania zdarzeń:

Kolejność zdarzeń zależy również od określonego dostawcy strumienia. W SMS strumieniach producent jawnie kontroluje kolejność zdarzeń postrzeganych przez konsumenta, kontrolując sposób ich publikowania. Strumienie kolejki platformy Azure nie gwarantują kolejności FIFO, ponieważ bazowe kolejki platformy Azure nie gwarantują kolejności w przypadkach awarii. Aplikacje mogą również kontrolować kolejność dostarczania strumienia przy użyciu polecenia StreamSequenceToken.

implementacja Strumienie

Implementacja Strumienie Orleanu zawiera ogólne omówienie implementacji wewnętrznej.

Przykłady kodu

Więcej przykładów korzystania z interfejsów API przesyłania strumieniowego w ramach ziarna można znaleźć tutaj. Planujemy utworzyć więcej przykładów w przyszłości.

Zobacz też