Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ta sekcja zawiera ogólne omówienie implementacji Orleans usługi Stream. W tym artykule opisano pojęcia i szczegóły, które nie są widoczne na poziomie aplikacji. Jeśli planujesz używać tylko strumieni, nie musisz odczytywać tej sekcji.
Terminologia:
Odwołujemy się do słowa "queue" jako każdej technologii trwałego przechowywania, która może pozyskiwać zdarzenia strumieniowe i umożliwia pobieranie zdarzeń lub udostępnia mechanizm oparty na wypychaniach do przetwarzania zdarzeń. Zwykle w celu zapewnienia skalowalności te technologie zapewniają partycjonowane kolejki. Na przykład usługa Azure Queues umożliwia tworzenie wielu kolejek, a usługa Event Hubs ma wiele centrów.
Strumienie trwałe
Dostawcy trwałych strumieni Orleans mają wspólną implementację PersistentStreamProvider. Ci dostawcy ogólnego strumienia muszą być skonfigurowani przy użyciu konkretnej IQueueAdapterFactorytechnologii.
Na przykład, do celów testowych mamy adaptery kolejki, które generują swoje dane testowe, zamiast odczytywać dane z kolejki. Poniższy kod pokazuje, jak skonfigurować persistentnego usługodawcę strumienia do użycia naszego niestandardowego adaptera kolejki (generatora). Robi to poprzez skonfigurowanie persistentnego dostawcy strumienia za pomocą funkcji fabrykacyjnej używanej do tworzenia adaptera.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Gdy nadawca strumienia generuje nowy element strumienia i wywołuje stream.OnNext()
, środowisko wykonawcze Orleans wywołuje odpowiednią metodę na IQueueAdapter tego dostawcy strumienia, który umieszcza element bezpośrednio w odpowiednią kolejkę.
Pozyskiwanie agentów
W centrum dostawcy trwałego strumienia są agenci ściągania. Agenty pobierające pobierają zdarzenia z zestawu trwałych kolejek i dostarczają je do kodu aplikacji w zbiorach, które je wykorzystują. Można traktować ściągających agentów jako rozproszoną "mikrousługę" — partycjonowany, wysoce dostępny i elastyczny składnik rozproszony. Agenty pobierające działają w tych samych silosach, które obsługują ziarna aplikacji i są w pełni zarządzane przez środowisko uruchomieniowe Orleans do przesyłania strumieniowego.
StreamQueueMapper
i StreamQueueBalancer
Agenci ściągający są parametryzowani parametrami IStreamQueueMapper i IStreamQueueBalancer. Element IStreamQueueMapper
zawiera listę wszystkich kolejek i jest również odpowiedzialny za przypisywanie strumieni do kolejek. W ten sposób część producenta dostawcy trwałego strumienia wie, do której kolejki należy umieścić komunikat.
IStreamQueueBalancer
przedstawia, w jaki sposób kolejki są zrównoważone między Orleans silosami i agentami. Celem jest przypisanie kolejek do agentów równomiernie, aby zapobiec wąskim gardłom i wspierać elastyczność. Po dodaniu nowego silosu do klastra Orleans kolejki są automatycznie ponownie zrównoważone w starych i nowych silosach. Element StreamQueueBalancer
umożliwia dostosowywanie tego procesu.
Orleans Ma kilka wbudowanych modułów StreamQueueBalancers, które obsługują różne scenariusze równoważenia (duże i małe liczby kolejek) i różne środowiska (Azure, lokalnie, statyczne).
Korzystając z przykładu generatora testów z powyższego, poniższy kod pokazuje, jak można skonfigurować maper kolejki i moduł równoważenia kolejek.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Powyższy kod konfiguruje GeneratorAdapterFactory tak, aby używał mapera kolejki z ośmioma kolejkami i równoważy kolejki w całym klastrze przy użyciu DynamicClusterConfigDeploymentBalancer.
Protokół ściągania
Każdy silos uruchamia zestaw ściągających agentów, każdy agent ściąga z jednej kolejki. Ściąganie agentów jest implementowane przez wewnętrzny składnik środowiska uruchomieniowego o nazwie SystemTarget. Obiekty SystemTarget (SystemTargets) są zasadniczo ziarnami środowiska uruchomieniowego, podlegają współbieżności jednowątkowej, mogą używać zwykłych komunikatów dla ziaren i są tak lekkie, jak ziarna. SystemTargets, w przeciwieństwie do ziaren, nie są wirtualne: są one jawnie tworzone (przez środowisko uruchomieniowe) i nie są niezależne od lokalizacji. Implementując agentów pobierających jako SystemTargets, Orleans środowisko uruchomieniowe przesyłania strumieniowego może polegać na wbudowanych Orleans funkcjach i może skalować do bardzo dużej liczby kolejek, ponieważ utworzenie nowego agenta pobierającego jest tak tanie, jak utworzenie nowego obiektu typu grain.
Każdy agent ściągania uruchamia okresowy czasomierz, który ściąga z kolejki, wywołując metodę IQueueAdapterReceiver.GetQueueMessagesAsync . Zwrócone komunikaty są umieszczane w wewnętrznej strukturze danych poszczególnych agentów o nazwie IQueueCache. Każdy komunikat jest sprawdzany, aby dowiedzieć się, jaki jest jego strumień docelowy. Agent używa Pub-Sub do znalezienia listy odbiorców strumienia, którzy zasubskrybowali ten strumień. Po pobraniu listy odbiorców agent przechowuje ją lokalnie (w swojej pamięci podręcznej pub-sub), aby nie musiał skonsultować się z Pub-Sub przy każdej wiadomości. Agent również subskrybuje model pub-sub, aby otrzymywać powiadomienia o nowych odbiorcach, którzy subskrybują ten strumień. To uścisk dłoni między agentem a pub-sub gwarantuje silne semantyki subskrypcji przesyłania strumieniowego: kiedy konsument zasubskrybuje strumień, zobaczy wszystkie zdarzenia, które zostały wygenerowane po zasubskrybowaniu. Ponadto użycie StreamSequenceToken
umożliwia subskrybowanie w przeszłości.
Pamięć podręczna kolejki
IQueueCache to wewnętrzna struktura danych dla poszczególnych agentów, która umożliwia oddzielenie procesu usuwania nowych zdarzeń z kolejki od ich dostarczania konsumentom. Umożliwia również oddzielenie dostarczania do różnych strumieni i różnych odbiorców.
Wyobraź sobie sytuację, w której jeden strumień ma 3 odbiorców strumienia, a jeden z nich działa wolno. Jeśli nie zostanie zachowana ostrożność, ten powolny konsument może wpływać na postęp agenta, spowalniając konsumpcję innych odbiorców tego strumienia, a nawet opóźniając pobieranie i dostarczanie zdarzeń dla innych strumieni. Aby temu zapobiec i umożliwić maksymalną równoległość w agencie, używamy IQueueCache
.
IQueueCache
buforuje zdarzenia strumienia i umożliwia agentowi dostarczanie zdarzeń do każdego odbiorcy w jego własnym tempie. Dostarczanie dla każdego konsumenta jest realizowane przez wewnętrzny składnik nazwa IQueueCacheCursor, który śledzi postęp każdego konsumenta. W ten sposób każdy odbiorca otrzymuje zdarzenia we własnym tempie: szybcy odbiorcy odbierają zdarzenia tak szybko, jak są one zdejmowane z kolejki, podczas gdy powolni konsumenci otrzymują je później. Po dostarczeniu komunikatu do wszystkich odbiorców można go usunąć z pamięci podręcznej.
Przeciwnacisk
Backpressure w środowisku uruchomieniowym przesyłania strumieniowego Orleans ma zastosowanie w dwóch miejscach: przełączanie zdarzeń strumienia z kolejki do agenta i dostarczanie zdarzeń z agenta do strumienia odbiorców.
Ten ostatni jest dostarczany przez wbudowany Orleans mechanizm dostarczania komunikatów. Każde zdarzenie strumienia jest dostarczane od agenta do użytkowników za pośrednictwem standardowego przekazu komunikatów 'grain', po kolei. Oznacza to, że agenci wysyłają jedno zdarzenie (lub ograniczony pakiet zdarzeń) do każdego odbiorcy strumienia i czekają na to wywołanie. Następne zdarzenie nie zostanie rozpoczęte, dopóki zadanie dla poprzedniego zdarzenia nie zostanie rozwiązane lub zakończone. W ten sposób naturalnie ograniczamy tempo dostarczania dla każdego konsumenta do jednego komunikatu naraz.
Podczas przenoszenia zdarzeń strumienia z kolejki do agenta Orleans usługa Streaming udostępnia nowy specjalny mechanizm kontrolowania ciśnienia. Ponieważ agent rozdziela odkolejkowywanie zdarzeń z kolejki i dostarczanie ich konsumentom, pojedynczy powolny konsument może się tak opóźnić, że IQueueCache
zapełni się. Aby zapobiec wzrostowi IQueueCache
na czas nieokreślony, ograniczamy jego rozmiar (można skonfigurować limit rozmiaru). Jednak agent nigdy nie wyrzuca niedostarczonych zdarzeń.
Zamiast tego, gdy pamięć podręczna zacznie się wypełniać, agenci spowalniają tempo usuwania zdarzeń z kolejki. Dzięki temu możemy przetrwać powolne okresy dostaw, dostosowując tempo pobierania z kolejki (backpressure - mechanizm kontroli przepływu) i wrócić do szybkiego tempa zużycia później. Aby wykryć "dolinę powolnego dostarczania", IQueueCache
używa wewnętrznej struktury danych zasobników pamięci podręcznej, która śledzi postęp dostarczania zdarzeń do poszczególnych odbiorców strumienia. Skutkuje to bardzo dynamicznym i samonastawijącym się systemem.