Sdílet prostřednictvím


Orleans streams – podrobnosti implementace

Tato část obsahuje základní přehled implementace streamu Orleans . Popisuje koncepty a podrobnosti, které nejsou viditelné na úrovni aplikace. Pokud plánujete používat pouze streamy, nemusíte si tuto část přečíst.

Terminologie:

Slovo "queue" odkazuje na jakoukoli technologii odolného úložiště, která dokáže přijímat události datových proudů a umožňuje buď vyžádání událostí, nebo poskytuje mechanismus pro jejich automatické zpracování. Tyto technologie obvykle poskytují rozdělené nebo segmentované fronty. Fronty Azure například umožňují vytvářet více front a Služba Event Hubs má více center.

Trvalé datové proudy

Všichni Orleans zprostředkovatelé trvalých streamů sdílejí společnou implementaci PersistentStreamProvider. Tito poskytovatelé obecných streamů musí být nakonfigurováni pomocí technologie specifické pro IQueueAdapterFactory.

Pro účely testování máme například adaptéry pro fronty, které jsou navrženy tak, aby generovaly testovací data místo toho, aby je čítaly z fronty. Následující kód ukazuje, jak nakonfigurujeme trvalý poskytovatel streamu tak, aby používal náš vlastní adaptér fronty (generátor). Toto se provádí tím, že nakonfigurujete poskytovatele trvalého toku pomocí funkce pro vytváření, která se používá k vytvoření adaptéru.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Když producent streamu vygeneruje novou položku streamu a volá stream.OnNext(), modul runtime streamování Orleans vyvolá příslušnou metodu na IQueueAdapter poskytovateli tohoto streamu, který položku zařadí přímo do příslušné fronty.

Tahající agenti

V srdci poskytovatele trvalých streamů jsou agenti pro stahování. Agenti pro vytahování načítají události ze sady trvalých front a doručují je do kódu aplikace v sekcích, které je využívají. Agenti pro vyžádání si můžete představit jako distribuovanou "mikroslužbu" – distribuovanou, dělenou, vysoce dostupnou a elastickou komponentu. Agenti pro stahování běží ve stejných silech, které hostují komponenty aplikací a jsou plně spravovány běhovým prostředím pro streamování Orleans.

StreamQueueMapper a StreamQueueBalancer

Tahoví agenti jsou parametrizováni pomocí IStreamQueueMapper a IStreamQueueBalancer. IStreamQueueMapper poskytuje seznam všech front a zodpovídá také za mapování datových proudů na fronty. Tímto způsobem výrobce poskytovatele trvalých streamů ví, do které fronty se má zpráva zařadit.

Vyjadřuje IStreamQueueBalancer způsob, jakým jsou fronty vyváženy mezi Orleans silami a agenty. Cílem je přiřadit fronty agentům vyváženým způsobem, aby se zabránilo kritickým bodům a podporovala elasticitu. Když do clusteru Orleans přidáte nové silo, fronty se automaticky znovu rozdělují mezi staré a nové silosy. Umožňuje StreamQueueBalancer přizpůsobit tento proces. Orleans má několik integrovaných StreamQueueBalancers, které podporují různé scénáře vyrovnávání (velký a malý počet front) a různá prostředí (Azure, místní, statická).

Pomocí příkladu generátoru testů z výše uvedeného kódu ukazuje, jak lze nakonfigurovat mapovač fronty a nástroj pro vyrovnávání front.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Výše uvedený kód nakonfiguruje GeneratorAdapterFactory tak, aby používal mapovač front s osmi frontami a rozkládá fronty v clusteru pomocí DynamicClusterConfigDeploymentBalancer příkazu.

Protokol stahování

Každé silo provozuje sadu agentů pro stahování, přičemž každý agent stahuje z jedné fronty. Samotné pull agenty jsou implementovány interní komponentou runtime, která se nazývá SystemTarget. SystemTargets jsou v podstatě runtime grainy, podléhají jednovláknové souběžnosti, mohou používat běžné zasílání zpráv a jsou stejně efektivní jako grainy. Na rozdíl od zrn nejsou SystemTargets virtuální: jsou explicitně vytvořeny běhovým prostředím a nejsou transparentní ohledně umístění. Implementací natahujících agentů jako SystemTargets Orleans může modul Runtime streamování spoléhat na integrované Orleans funkce a může škálovat na velmi velký počet front, protože vytvoření nového natahujícího agenta je stejně levné jako vytvoření nové grain.

Každý přenosový agent spouští pravidelný časovač, který načítá z fronty voláním metody IQueueAdapterReceiver.GetQueueMessagesAsync. Vrácené zprávy jsou vloženy do interní datové struktury nazvané IQueueCache, která je určena pro jednotlivé agenty. Každá zpráva se zkontroluje, aby zjistila cílový datový proud. Agent pomocí Pub-Sub zjistí seznam příjemců streamů, kteří se přihlásili k odběru tohoto streamu. Jakmile se seznam příjemců načte, agent ho ukládá místně (v mezipaměti pub-sub), takže se nemusí poradit s Pub-Sub ke každé zprávě. Agent se také přihlásí k odběru pub-sub, aby dostával oznámení o všech nových konzumentech, kteří se přihlásí k odběru tohoto streamu. Toto handshake mezi agentem a mechanismem publikování a odběru zaručuje silnou sémantiku streamovacího odběru: jakmile se příjemce přihlásí k odběru streamu, uvidí všechny události, které byly generovány po přihlášení k odběru. Použití StreamSequenceToken navíc umožňuje přihlásit se k odběru v minulosti.

Mezipaměť front

IQueueCache je interní datová struktura pro každého agenta, která umožňuje oddělení procesu odstraňování nových událostí z fronty od jejich doručení příjemcům. Umožňuje také oddělení doručování různým datovým proudům a různým příjemcům.

Představte si situaci, kdy jeden datový proud má 3 odběratele proudu a jeden z nich je pomalý. Pokud se nezajímá, může tento pomalý spotřebitel ovlivnit průběh agenta, zpomalit spotřebu jiných příjemců tohoto datového proudu a dokonce zpomalit vyřazení a doručování událostí pro ostatní streamy. Abychom tomu zabránili a umožnili maximální paralelismus agenta, používáme IQueueCache.

IQueueCache ukládá do vyrovnávací paměti události datového proudu a poskytuje agentovi způsob, jak doručovat události každému příjemci vlastním tempem. Doručení pro jednotlivé spotřebitele je implementováno interní komponentou nazvanou IQueueCacheCursor, která sleduje průběh pro jednotlivé spotřebitele. Každý příjemce tak přijímá události vlastním tempem: rychlé příjemce přijímá události tak rychle, jak jsou vyřazeny z fronty, zatímco pomalé příjemce je obdrží později. Jakmile se zpráva doručí všem příjemcům, můžete ji z mezipaměti odstranit.

Zpětný tlak

Backpressure v prostředí Orleans Streaming Runtime se uplatňuje na dvou místech: při přenosu událostí streamu z fronty do agenta a při doručování událostí z agenta ke koncovým uživatelům streamu.

Druhá je poskytována integrovaným Orleans mechanismem doručování zpráv. Každá událost datového proudu se doručí z agenta příjemcům prostřednictvím standardního Orleans zasílání zpráv o podrobnostech po jednom. To znamená, že agenti odesílají jednu událost (nebo dávku událostí s omezenou velikostí) každému příjemci datového proudu a čekají na toto volání. Další událost se nezačne doručovat, dokud nebyl vyřešen nebo přerušen úkol předchozí události. Tímto způsobem přirozeně omezujeme míru doručení pro jednotlivé příjemce na jednu zprávu najednou.

Při přenosu událostí streamu z fronty do agenta poskytuje Orleans streamování nový speciální mechanismus „Backpressure“. Vzhledem k tomu, že agent oddělí vyřazování událostí z fronty a doručí je příjemcům, může jeden pomalý příjemce zaostávat natolik, že se IQueueCache vyplní. Abychom zabránili IQueueCache neomezenému růstu, omezujeme jeho velikost (limit velikosti je konfigurovatelný). Agent však nikdy nevyhodí nedoručené události.

Místo toho, když se mezipaměť začne zaplňovat, agenti zpomalují rychlost vyřazování událostí z fronty. Tímto způsobem můžeme zvládnout pomalé doby doručení tím, že upravíme tempo, jakým spotřebováváme z fronty ("backpressure"), a později se vrátíme k rychlé spotřebě. K detekci "pomalého doručování" používá IQueueCache interní datovou strukturu slotů mezipaměti, která sleduje průběh doručování událostí jednotlivým uživatelům datových proudů. Výsledkem je velmi responzivní a samočinný systém.