Partager via


Détails de l’implémentation des flux Orleans

Cette section fournit une vue d’ensemble générale de l’implémentation des flux Orleans. Elle décrit les concepts et les détails qui ne sont pas visibles au niveau de l’application. Si vous envisagez uniquement d’utiliser des flux, vous n’avez pas besoin de lire cette section.

Terminologie :

Avec le terme « file d’attente », nous faisons référence à toute technologie de stockage durable capable d’ingérer des événements de flux et qui permet de tirer (pull) des événements ou qui fournit un mécanisme de type push pour consommer des événements. Généralement, pour assurer la scalabilité, ces technologies fournissent des files d’attente fragmentées/partitionnées. Par exemple, les files d’attente Azure vous permettent de créer plusieurs files d’attente et le service Event Hubs a plusieurs hubs.

Flux persistants

Tous les fournisseurs de flux persistants d’Orleans partagent une implémentation PersistentStreamProvider commune. Ces fournisseurs de flux génériques doivent être configurés avec un élément IQueueAdapterFactory spécifique à la technologie.

Par exemple, à des fins de test, nous avons des adaptateurs de file d’attente qui génèrent leurs données de test au lieu de lire les données à partir d’une file d’attente. Le code ci-dessous montre comment configurer un fournisseur de flux persistant pour utiliser notre adaptateur de file d’attente personnalisé (générateur). Il le fait en configurant le fournisseur de flux persistant avec une fonction de fabrique utilisée pour créer l’adaptateur.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Quand un producteur de flux génère un nouvel élément de flux et appelle stream.OnNext(), le runtime de streaming Orleans appelle la méthode appropriée sur le IQueueAdapter de ce fournisseur de flux qui met en file d’attente l’élément directement sur la file d’attente appropriée.

Agents de tirage

Les agents de tirage figurent au cœur du fournisseur de flux persistant. Les agents de tirage tirent (pull) des événements à partir d’un ensemble de files d’attente durables et les remettent au code d’application en grains qu’il consomme. Les agents de tirage peuvent être vus comme un « micro-service » distribué, un composant distribué, partitionné, hautement disponible et élastique. Les agents de tirage s’exécutent à l’intérieur des mêmes silos que ceux qui hébergent les grains d’application et sont entièrement gérés par le runtime de streaming Orleans.

StreamQueueMapper et StreamQueueBalancer

Les agents de tirage sont paramétrés avec IStreamQueueMapper et IStreamQueueBalancer. IStreamQueueMapper fournit la liste de toutes les files d’attente et est également responsable du mappage des flux sur les files d’attente. De cette façon, le côté producteur du fournisseur de flux persistant sait dans quelle file d’attente placer le message.

IStreamQueueBalancer exprime la façon dont les files d’attente sont équilibrées entre les silos et les agents Orleans. L’objectif est d’affecter des files d’attente aux agents de manière équilibrée, afin d’éviter les goulots d’étranglement et de prendre en charge l’élasticité. Quand un nouveau silo est ajouté au cluster Orleans, les files d’attente sont rééquilibrées automatiquement entre les anciens et les nouveaux silos. StreamQueueBalancer permet de personnaliser ce processus. Orleans dispose de plusieurs StreamQueueBalancers intégrés pour prendre en charge différents scénarios d’équilibrage (grand et petit nombres de files d’attente) et différents environnements (Azure, local, statique).

À l’aide de l’exemple de générateur de test ci-dessus, le code ci-dessous montre comment configurer le mappeur de file d’attente et l’équilibreur de file d’attente.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Le code ci-dessus configure GeneratorAdapterFactory pour utiliser un mappeur de file d’attente avec huit files d’attente et équilibre les files d’attente sur le cluster à l’aide de DynamicClusterConfigDeploymentBalancer.

Protocole de tirage (pull)

Chaque silo exécute un ensemble d’agents de tirage et chaque agent effectue le tirage à partir d’une file d’attente. Les agents de tirage eux-mêmes sont implémentés par un composant de runtime interne, appelé SystemTarget. Les composants SystemTarget sont essentiellement des grains de runtime, sont soumis à une concurrence à thread unique, peuvent utiliser la messagerie de grain standard et sont aussi légers que les grains. Contrairement aux grains, les composants SystemTarget ne sont pas virtuels : ils sont explicitement créés (par le runtime) et ne sont pas transparents pour l’emplacement. En implémentant des agents de tirage en tant que composants SystemTarget, le runtime de streaming Orleans peut s’appuyer sur des fonctionnalités Orleans intégrées et peut être mis à l’échelle pour un très grand nombre de files d’attente, car la création d’un nouvel agent de tirage ne coûte pas plus que celle d’un nouveau grain.

Chaque agent de tirage exécute un minuteur périodique qui effectue un tirage à partir de la file d’attente en appelant la méthode IQueueAdapterReceiver.GetQueueMessagesAsync. Les messages retournés sont placés dans la structure de données interne par agent appelée IQueueCache. Chaque message est inspecté pour mettre à jour son flux de destination. L’agent utilise Pub-Sub pour déterminer la liste des consommateurs de flux qui se sont abonnés à ce flux. Une fois la liste des consommateurs récupérée, l’agent la stocke localement (dans son cache pub-sub) afin de ne pas avoir à consulter Pub-Sub pour chaque message. L’agent s’abonne également à pub-sub pour recevoir une notification de tous les nouveaux consommateurs qui s’abonnent à ce flux. Cette négociation entre l’agent et pub-sub garantit une sémantique d’abonnement de streaming forte : une fois que le consommateur s’est abonné au flux, il verra tous les événements qui ont été générés après son abonnement. En outre, l’utilisation StreamSequenceToken lui permet de s’abonner dans le passé.

Cache de file d’attente

IQueueCache est une structure de données par agent interne qui permet de dissocier le retrait des nouveaux événements de la file d’attente et de les remettre aux consommateurs. Elle permet également de dissocier la remise à différents flux et à différents consommateurs.

Imaginez une situation où un flux a 3 consommateurs de flux et où l’un d’eux est lent. Si des précautions ne sont pas prises, la lenteur de ce consommateur peut affecter la progression de l’agent, ralentir la consommation d’autres consommateurs de ce flux, et même ralentir le retrait de file d’attente et la remise des événements pour d’autres flux. Pour empêcher cela et favoriser un parallélisme maximal dans l’agent, nous utilisons IQueueCache.

IQueueCache place en mémoire tampon les événements de flux et permet à l’agent de remettre les événements à chaque consommateur à son rythme propre. La remise par consommateur est implémentée par le composant interne appelé IQueueCacheCursor, qui suit la progression par consommateur. De cette façon, chaque consommateur reçoit des événements à son rythme : les consommateurs rapides reçoivent des événements aussi rapidement qu’ils sont supprimés de la file d’attente, tandis que les consommateurs lents les reçoivent plus tard. Une fois le message remis à tous les consommateurs, il peut être supprimé du cache.

Régulation

La régulation dans le runtime de streaming Orleans s’applique dans deux cadres : l’apport d’événements de flux depuis la file d’attente à l’agent et la remise des événements de l’agent à des consommateurs de flux.

Ce dernier cadre est fourni par le mécanisme de remise de messages intégré à Orleans. Chaque événement de flux est remis de l’agent aux consommateurs via la messagerie de grains Orleans standard, un à la fois. Autrement dit, les agents envoient un événement (ou un lot limité d’événements) à chaque consommateur de flux et attendent cet appel. La remise de l’événement suivant ne démarre pas tant que la tâche pour l’événement précédent n’a pas été résolue ou interrompue. De cette façon, nous limitons naturellement le taux de remise par consommateur à un message à la fois.

Lors de l’apport d’événements de flux de la file d’attente à l’agent, le streaming Orleans fournit un nouveau mécanisme de régulation spécial. Comme l’agent dissocie le retrait des événements de la file d’attente et leur remise aux consommateurs, un consommateur lent peut à lui seul prendre un retard tel, que le cache IQueueCache sera rempli. Pour empêcher IQueueCache de croître indéfiniment, nous limitons sa taille (cette limite de taille est configurable). Toutefois, l’agent ne rejette jamais des événements non remis.

Au lieu de cela, quand le cache commence à se remplir, les agents ralentissent le taux de retrait des événements de la file d’attente. De cette façon, nous pouvons « sortir » des périodes de remise lente en ajustant le taux auquel nous consommons les données de la file d’attente (« régulation »), et revenir à des taux de consommation rapides plus tard. Pour détecter les creux de « remise lente », IQueueCache utilise une structure de données interne faite de compartiments de cache, qui suit la progression de la remise d’événements aux consommateurs de flux individuels. Cela génère un système très réactif et auto-ajusté.