Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa sezione offre una panoramica generale dell'implementazione di Orleans Stream. Descrive i concetti e i dettagli che non sono visibili a livello di applicazione. Se si prevede di usare solo flussi, non è necessario leggere questa sezione.
Terminologia:
Con il termine "queue" si intende qualsiasi tecnologia di archiviazione durevole capace di gestire eventi di flusso e che consente di estrarre eventi o fornisce un meccanismo di push per consumare eventi. In genere, per garantire la scalabilità, tali tecnologie forniscono code shardate/partizionate. Ad esempio, le code di Azure consentono di creare più code e gli Event Hubs hanno più hub.
Flussi persistenti
Tutti i Orleans provider di flussi persistenti condividono un'implementazione PersistentStreamProvidercomune. Questi provider di flussi generici devono essere configurati con una tecnologia specifica IQueueAdapterFactory.
Ad esempio, a scopo di test, sono disponibili adattatori di coda che generano i dati di test anziché leggere i dati da una coda. Il codice seguente illustra come configurare un provider di flussi persistente per utilizzare il nostro adattatore personalizzato di coda (generatore). A tale scopo, il provider di stream persistente viene configurato tramite una funzione fabbrica utilizzata per creare l'adattatore.
hostBuilder.AddPersistentStreams(
StreamProviderName, GeneratorAdapterFactory.Create);
Quando un produttore di flusso genera un nuovo elemento di flusso e chiama stream.OnNext(), il runtime di streaming Orleans richiama il metodo appropriato sul IQueueAdapter di quel provider di flusso, che accoda l'elemento direttamente nella coda appropriata.
Agenti di attrazione
Al centro del Provider di flusso persistente ci sono gli agenti di estrazione. Gli agenti di pull estraggono gli eventi da un set di code durevoli e li recapitano al codice dell'applicazione in grani che li utilizza. È possibile considerare gli agenti di pull come un "micro-servizio" distribuito, ovvero un componente partizionato, altamente disponibile ed elastico. Gli agenti di estrazione vengono eseguiti all'interno degli stessi silo che ospitano i grani dell'applicazione e sono completamente gestiti dal Orleans Streaming Runtime.
StreamQueueMapper e StreamQueueBalancer
Gli agenti di pull sono parametrizzati con IStreamQueueMapper e IStreamQueueBalancer.
IStreamQueueMapper fornisce un elenco di tutte le code ed è anche responsabile dell'assegnazione dei flussi alle code. In questo modo, il lato del produttore del fornitore di flusso persistente conosce la coda in cui accodare il messaggio.
IStreamQueueBalancer Esprime il modo in cui le code vengono bilanciate tra Orleans silos e agenti. L'obiettivo è assegnare code agli agenti in modo bilanciato, per evitare colli di bottiglia e favorire l'elasticità. Quando viene aggiunto un nuovo silo al Orleans cluster, le code vengono ribilanciate automaticamente tra i silo precedenti e nuovi.
StreamQueueBalancer consente di personalizzare tale processo.
Orleans include diversi streamQueueBalancer predefiniti, per supportare diversi scenari di bilanciamento (numero elevato e ridotto di code) e diversi ambienti (Azure, locale, statico).
Usando l'esempio del generatore di test riportato sopra, il codice seguente illustra come configurare il mapper della coda e il servizio di bilanciamento della coda.
hostBuilder
.AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
providerConfigurator =>
providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
ob => ob.Configure(options => options.TotalQueueCount = 8))
.UseDynamicClusterConfigDeploymentBalancer());
Il codice precedente configura GeneratorAdapterFactory per utilizzare un mapper delle code con otto code e distribuisce le code nel cluster usando DynamicClusterConfigDeploymentBalancer.
Protocollo di estrazione
Ogni silo esegue un set di agenti di estrazione, e ogni agente preleva da una coda. Gli agenti di pull stessi vengono implementati da un componente di runtime interno, denominato SystemTarget. I systemTarget sono essenzialmente grani di runtime, sono soggetti a concorrenza a thread singolo, possono usare la messaggistica granulare regolare e sono leggeri come grani. A differenza dei grani, i SystemTargets non sono virtuali: vengono creati esplicitamente (dal runtime) e non sono trasparenti rispetto alla posizione. Implementando agenti di pull come SystemTargets, il Orleans runtime di streaming può basarsi sulle funzionalità predefinite Orleans e può essere ridimensionato a un numero molto elevato di code, poiché la creazione di un nuovo agente di pull è economica quanto la creazione di un nuovo grano.
Ogni agente di pull utilizza un timer periodico per eseguire il pull dalla coda richiamando il metodo IQueueAdapterReceiver.GetQueueMessagesAsync. I messaggi restituiti vengono inseriti nella struttura dei dati interna per agente denominata IQueueCache. Ogni messaggio viene controllato per individuare il flusso di destinazione. L'agente usa il Pub-Sub per individuare l'elenco dei consumatori del flusso che si sono iscritti a questo flusso. Una volta recuperato l'elenco di consumer, l'agente lo memorizza in locale (nella relativa cache pub-sub) in modo da non dover consultare Pub-Sub per ogni messaggio. L'agente partecipa anche al sistema pub-sub per ricevere una notifica di tutti i nuovi consumatori che sottoscrivono tale flusso. Questo handshake tra l'agente e il pub-sub garantisce una semantica di sottoscrizione di streaming avanzata: una volta che il consumer ha sottoscritto il flusso, visualizzerà tutti gli eventi generati dopo la sottoscrizione. Inoltre, l'uso di StreamSequenceToken consente di sottoscrivere nel passato.
Cache di coda
IQueueCache è una struttura di dati interna per agente che consente di disaccoppiare i nuovi eventi dalla coda e di distribuirli ai consumer. Consente anche di disaccoppiare il recapito a flussi diversi e a consumatori diversi.
Immagina una situazione in cui un flusso abbia 3 consumatori di streaming e uno di essi sia lento. Se non si presta attenzione, questo consumer lento può influire sui progressi dell'agente, rallentando l'elaborazione di altri consumer di quel flusso e persino rallentando l'estrazione dalla coda e la consegna di eventi per altri flussi. Per evitare ciò e consentire il massimo parallelismo nell'agente, viene usato IQueueCache.
IQueueCache memorizza nel buffer gli eventi di flusso e consente all'agente di distribuire gli eventi a ciascun consumatore secondo il suo ritmo. Il recapito per-consumatore viene implementato dal componente interno denominato IQueueCacheCursor, che tiene traccia dei progressi di ciascun consumatore. In questo modo, ogni consumer riceve gli eventi al proprio ritmo: i consumer veloci ricevono gli eventi appena vengono dequeuati dalla coda, mentre i consumer lenti li ricevono in un secondo momento. Una volta recapitato il messaggio a tutti i consumer, può essere eliminato dalla cache.
Contropressione
La contropressione nel Orleans sistema di esecuzione di streaming si applica in due posizioni: portare gli eventi di flusso dalla coda all'agente e recapitare gli eventi dall'agente ai consumatori di flusso.
Quest'ultimo viene fornito dal meccanismo di recapito dei messaggi predefinito Orleans . Ogni evento di flusso viene recapitato dall'agente ai consumer tramite la messaggistica di granularità standard Orleans , una alla volta. Ovvero, gli agenti inviano un evento (o un batch di eventi di dimensioni limitate) a ogni consumatore di flusso e attendono questa risposta. L'evento successivo non verrà erogato finché l'attività relativa all'evento precedente non sarà stata risolta o bloccata. In questo modo si limita naturalmente la frequenza di recapito per consumer a un messaggio alla volta.
Quando si trasferiscono eventi di flusso dalla coda all'agente, Orleans Streaming fornisce un nuovo meccanismo speciale di Backpressure. Poiché l'agente disaccoppia la coda degli eventi e li distribuisce ai consumer, un singolo consumer lento può restare in ritardo in modo tale che l'oggetto IQueueCache si riempia. Per evitare di IQueueCache aumentare a tempo indefinito, limitiamo le dimensioni (il limite di dimensioni è configurabile). Tuttavia, l'agente non scarta mai gli eventi non recapitati.
Al contrario, quando la cache inizia a riempirsi, gli agenti rallentano la velocità con cui gli eventi vengono rimossi dalla coda. In questo modo, possiamo gestire i periodi di consegna lenta regolando la velocità con cui consumiamo dalla coda ("backpressure") e tornare a velocità di consumo elevate in un secondo momento. Per rilevare le "valle del recapito lento", IQueueCache utilizza una struttura di dati interna dei bucket della cache che tiene traccia dello stato di avanzamento del recapito degli eventi ai singoli consumatori di flussi. Ciò comporta un sistema molto reattivo e auto-regolante.