Szczegóły implementacji strumieni logicznych

Ta sekcja zawiera ogólne omówienie implementacji usługi Orleans Stream. Opisano w nim pojęcia i szczegóły, które nie są widoczne na poziomie aplikacji. Jeśli planujesz używać tylko strumieni, nie musisz czytać tej sekcji.

Terminologia:

Słowo "kolejka" odnosi się do dowolnej technologii trwałego magazynu, która może pozyskać zdarzenia strumienia i umożliwia ściąganie zdarzeń lub udostępnia mechanizm oparty na wypychaniu, który umożliwia korzystanie ze zdarzeń. Zwykle w celu zapewnienia skalowalności te technologie zapewniają kolejki podzielone na fragmenty/partycje. Na przykład usługa Azure Queues umożliwia tworzenie wielu kolejek, a Event Hubs wiele centrów.

Trwałe strumienie

Wszyscy dostawcy strumienia trwałego Orleans mają wspólną implementację PersistentStreamProvider. Tych ogólnych dostawców strumienia należy skonfigurować przy użyciu technologii IQueueAdapterFactory.

Na przykład do celów testowych mamy karty kolejek, które generują swoje dane testowe, a nie odczytują dane z kolejki. Poniższy kod pokazuje, jak skonfigurować trwałego dostawcę strumienia do korzystania z naszej niestandardowej karty kolejki (generatora). W tym celu należy skonfigurować trwałego dostawcę strumienia za pomocą funkcji fabryki użytej do utworzenia karty.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Gdy producent strumienia stream.OnNext()generuje nowy element strumienia i wywołuje metodę , środowisko uruchomieniowe przesyłania strumieniowego Orlean wywołuje IQueueAdapter odpowiednią metodę w dostawcy strumienia, która kolejkuje element bezpośrednio do odpowiedniej kolejki.

Ściąganie agentów

Sednem stałego dostawcy strumienia są agenci ściągania. Ściąganie agentów ściąga zdarzenia z zestawu trwałych kolejek i dostarcza je do kodu aplikacji w ziarnie, które je zużywa. Ściągających agentów można myśleć jak o rozproszonym "mikrousługi" — partycjonowanych, wysoce dostępnych i elastycznych składnikach rozproszonych. Agenci ściągania działają wewnątrz tych samych silosów, które hostują ziarna aplikacji i są w pełni zarządzani przez środowisko Orleans Streaming Runtime.

StreamQueueMapper i StreamQueueBalancer

Agenci ściągania są sparametryzowane za pomocą parametrów i IStreamQueueMapperIStreamQueueBalancer. Zawiera IStreamQueueMapper listę wszystkich kolejek i jest również odpowiedzialny za mapowanie strumieni do kolejek. Dzięki temu strona producenta dostawcy trwałego strumienia wie, do której kolejki należy kolejkować komunikat.

Wyrażenie IStreamQueueBalancer wyraża sposób równoważenia kolejek w silosach Orlean i agentach. Celem jest przypisanie kolejek do agentów w zrównoważony sposób, aby zapobiec wąskim gardłom i obsługiwać elastyczność. Po dodaniu nowego silosu do klastra Orleans kolejki są automatycznie ponownie równoważenia między starym i nowym silosem. Umożliwia StreamQueueBalancer dostosowywanie tego procesu. System Orleans ma kilka wbudowanych usług StreamQueueBalancers, które obsługują różne scenariusze równoważenia (duża i niewielka liczba kolejek) i różne środowiska (platforma Azure, środowisko wstępne, statyczne).

Korzystając z powyższego przykładu generatora testów, poniższy kod pokazuje, jak można skonfigurować mapowanie kolejek i usługę queue balancer.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Powyższy kod konfiguruje program do GeneratorAdapterFactory używania mapowania kolejek z 8 kolejkami i równoważy kolejki w klastrze przy użyciu .DynamicClusterConfigDeploymentBalancer

Protokół ściągania

Każdy silos uruchamia zestaw agentów ściągania, a każdy agent ściąga z jednej kolejki. Ściąganie agentów jest implementowane przez wewnętrzny składnik środowiska uruchomieniowego o nazwie SystemTarget. Elementy SystemTarget są zasadniczo ziarnami środowiska uruchomieniowego, podlegają współbieżności jednowątkowej, mogą używać zwykłych komunikatów o ziarnie i są tak lekkie, jak ziarno. W przeciwieństwie do ziarna, systemTargets nie są wirtualne: są tworzone jawnie (przez środowisko uruchomieniowe) i nie są przezroczyste lokalizacji. Implementując ściąganie agentów jako systemTargets, środowisko Orleans Streaming Runtime może polegać na wbudowanych funkcjach Orleans i skalować do bardzo dużej liczby kolejek, ponieważ utworzenie nowego agenta ściągania jest tak tanie, jak utworzenie nowego ziarna.

Każdy agent ściągania uruchamia okresowy czasomierz, który ściąga z kolejki przez wywołania IQueueAdapterReceiver.GetQueueMessagesAsync metody . Zwrócone komunikaty są umieszczane w wewnętrznej strukturze danych na agenta o nazwie IQueueCache. Każdy komunikat jest sprawdzany w celu znalezienia jego strumienia docelowego. Agent korzysta z Pub-Sub, aby znaleźć listę odbiorców strumienia, którzy zasubskrybowali ten strumień. Po pobraniu listy odbiorców agent przechowuje ją lokalnie (w swojej pamięci podręcznej pub-sub), dzięki czemu nie musi skonsultować się z Pub-Sub dotyczącej każdego komunikatu. Agent subskrybuje również subskrypcję pub-sub, aby otrzymywać powiadomienia o nowych użytkownikach, którzy subskrybują ten strumień. To uściślicie między agentem a subskrypcją pub-sub gwarantuje silną semantykę subskrypcji przesyłania strumieniowego : po zasubskrybowania strumienia przez użytkownika będą widzieć wszystkie zdarzenia, które zostały wygenerowane po zasubskrybowania. Ponadto użycie funkcji umożliwia StreamSequenceToken subskrybowanie w przeszłości.

Pamięć podręczna kolejek

IQueueCache to wewnętrzna struktura danych na agenta, która umożliwia oddzielenie nowych zdarzeń od kolejki i dostarczenie ich klientom. Umożliwia również rozdzielenie dostarczania do różnych strumieni i różnych odbiorców.

Imagine sytuacji, w której jeden strumień ma 3 odbiorców strumienia, a jeden z nich jest wolny. Jeśli ten powolny użytkownik nie zostanie zadbany, może mieć wpływ na postęp agenta, spowalniając zużycie przez innych odbiorców tego strumienia, a nawet spowalniając kolejkowanie i dostarczanie zdarzeń dla innych strumieni. Aby temu zapobiec i umożliwić maksymalną równoległość w agencie, użyjemy .IQueueCache

IQueueCache buforuje zdarzenia strumienia i umożliwia agentowi dostarczanie zdarzeń do każdego konsumenta we własnym tempie. Dostarczanie na konsumenta jest implementowane przez składnik wewnętrzny o nazwie IQueueCacheCursor, który śledzi postęp na konsumenta. Dzięki temu każdy konsument odbiera zdarzenia we własnym tempie: szybko odbiorcy odbierają zdarzenia tak szybko, jak są z kolejki, podczas gdy powolni odbiorcy odbierają je później. Gdy komunikat zostanie dostarczony do wszystkich odbiorców, można go usunąć z pamięci podręcznej.

Backpressure

Kompresja w środowisku Orleans Streaming Runtime ma zastosowanie w dwóch miejscach: przy przesyłaniu zdarzeń strumienia z kolejki do agenta i dostarczaniu zdarzeń z agenta do odbiorców strumienia.

Drugi z nich jest dostarczany przez wbudowany mechanizm dostarczania komunikatów orleanowych. Każde zdarzenie strumienia jest dostarczane z agenta do klientów za pośrednictwem standardowej obsługi komunikatów ziarna orleanu, po jednej na raz. Oznacza to, że agenci wysyłają jedno zdarzenie (lub partię zdarzeń o ograniczonym rozmiarze) do każdego konsumenta strumienia i oczekują na to wywołanie. Następne zdarzenie nie zostanie dostarczone, dopóki zadanie dla poprzedniego zdarzenia nie zostanie rozwiązane lub nie zostanie przerwane. W ten sposób naturalnie ograniczamy szybkość dostarczania na konsumenta do jednego komunikatu na raz.

Podczas przesyłania zdarzeń strumienia z kolejki do agenta przesyłanie strumieniowe orlean udostępnia nowy specjalny mechanizm backpressure. Ponieważ agent oddzieli zdarzenia od kolejki i dostarcza je klientom, IQueueCache pojedynczy powolny konsument może w tyle zapełnić się. Aby zapobiec IQueueCache nieograniczonemu wzrostowi, ograniczamy jego rozmiar (limit rozmiaru można skonfigurować). Jednak agent nigdy nie zgłasza zdarzeń niedostarczonych.

Zamiast tego, gdy pamięć podręczna zacznie się zapełniać, agenci spowalniają szybkość kolejkowania zdarzeń z kolejki. Dzięki temu możemy "zjeżdżać" powolne okresy dostarczania, dostosowując szybkość, z jaką zużywamy z kolejki ("backpressure") i powracając do szybkich wskaźników zużycia później. Aby wykryć dolinę "powolnego dostarczania" IQueueCache , program używa wewnętrznej struktury danych zasobników pamięci podręcznej, która śledzi postęp dostarczania zdarzeń do poszczególnych odbiorców strumienia. Skutkuje to bardzo dynamicznym i samonadpasowym systemem.