Detalhes da implementação de fluxos do Orleans

Esta seção fornece uma visão geral de alto nível da implementação de fluxos do Orleans. Ela descreve conceitos e detalhes que não são visíveis no nível do aplicativo. Se você pretender usar apenas fluxos, não precisará ler esta seção.

Terminologia:

Nós nos referimos pela palavra "fila" a qualquer tecnologia de armazenamento durável que possa ingerir eventos de fluxo e permita efetuar pull de eventos ou forneça um mecanismo baseado em push para consumir eventos. Normalmente, para fornecer escalabilidade, essas tecnologias fornecem filas fragmentadas/particionadas. Por exemplo, as Filas do Azure permitem criar várias filas, e os Hubs de Eventos têm vários hubs.

Fluxos persistentes

Todos os provedores de fluxo persistente do Orleans compartilham uma implementação PersistentStreamProvider. Esses provedores de fluxo genéricos precisam ser configurados com um IQueueAdapterFactory específico da tecnologia.

Por exemplo, para fins de teste, temos adaptadores de fila que geram dados de teste em vez de ler os dados de uma fila. O código abaixo mostra como configuramos um provedor de fluxo persistente para usar nosso adaptador de fila personalizado (gerador). Ele faz isso configurando o provedor de fluxo persistente com uma função de alocador usada para criar o adaptador.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Quando um produtor de fluxo gera um novo item de fluxo e chama stream.OnNext(), o runtime de streaming do Orleans invoca o método apropriado no IQueueAdapter desse provedor de fluxo que enfileira o item diretamente na fila apropriada.

Como efetuar pull de agentes

No centro do provedor de fluxo persistente estão os agentes de pull. Os agentes de pull efetuam pull de eventos de um conjunto de filas duráveis e os entregam ao código do aplicativo na granularidade que os consomem. É possível considerar os agentes de pull como um "microsserviço" distribuído, um componente distribuído particionado, altamente disponível e elástico. Os agentes de pull são executados nos mesmos silos que hospedam a granularidade do aplicativo e são totalmente gerenciados pelo runtime de streaming do Orleans.

StreamQueueMapper e StreamQueueBalancer

Os agentes de pull são parametrizados com IStreamQueueMapper e IStreamQueueBalancer. O IStreamQueueMapper fornece uma lista de todas as filas e é responsável por mapear fluxos para filas. Dessa forma, o lado produtor do provedor de fluxo persistente sabe a fila em que a mensagem deve ser colocada.

O IStreamQueueBalancer expressa a forma como as filas são equilibradas entre os silos e os agentes do Orleans. A meta é atribuir filas aos agentes de maneira equilibrada, para evitar gargalos e dar suporte à elasticidade. Quando um novo silo é adicionado ao cluster do Orleans, as filas são automaticamente rebalanceadas nos silos antigos e novos. O StreamQueueBalancer permite personalizar esse processo. O Orleans tem vários StreamQueueBalancers internos para dar suporte a diferentes cenários de balanceamento (grande e pequeno número de filas) e ambientes diferentes (Azure, local e estático).

Usando o exemplo de gerador de teste acima, o código abaixo mostra como é possível configurar o mapeador de filas e o balanceador de filas.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

O código acima configura o GeneratorAdapterFactory para usar um mapeador de filas com oito filas e equilibra as filas no cluster usando o DynamicClusterConfigDeploymentBalancer.

Como efetuar pull do protocolo

Cada silo executa um conjunto de agentes de pull, e cada agente efetua pull de uma fila. Os próprios agentes de pull são implementados por um componente de runtime interno, chamado SystemTarget. Os SystemTargets são essencialmente a granularidade de runtime, estão sujeitos à simultaneidade de thread único, podem usar mensagens de granularidade regular e são tão leves quanto à granularidade. Em contraste com a granularidade, os SystemTargets não são virtuais: eles são criados explicitamente (pelo runtime) e não são transparentes de localização. Ao implementar agentes de pull como SystemTargets, o runtime de streaming do Orleans pode contar com recursos internos do Orleans e pode ser escalado para um número muito grande de filas, pois a criação de um agente de pull é tão barata quanto a criação de uma granularidade.

Cada agente de pull executa um temporizador periódico que é retirado da fila pela invocação do método IQueueAdapterReceiver.GetQueueMessagesAsync. As mensagens retornadas são colocadas na estrutura de dados interna por agente chamada IQueueCache. Cada mensagem é inspecionada para descobrir o respectivo fluxo de destino. O agente usa a publicação/assinatura para descobrir a lista de consumidores de fluxo que assinaram esse fluxo. Depois que a lista de consumidores é recuperada, o agente a armazena localmente (no cache da publicação/assinatura) para que não precise consultar a publicação/assinatura em cada mensagem. O agente também assina a publicação/assinatura para receber a notificação de todos os novos consumidores que assinam esse fluxo. Esse handshake entre o agente e a publicação/assinatura garante uma semântica de assinatura de streaming forte: depois que o consumidor tiver se inscrito no fluxo, ele verá todos os eventos que foram gerados após a assinatura dele. Além disso, o uso de StreamSequenceToken permite que ele se inscreva no passado.

Cache de fila

O IQueueCache é uma estrutura interna de dados por agente que permite separar novos eventos da fila e entregá-los aos consumidores. Ele também permite a separação da entrega para diferentes fluxos e diferentes consumidores.

Imagine uma situação em que um fluxo tenha três consumidores de fluxo e um deles seja lento. Se você não tomar cuidado, esse consumidor lento poderá afetar o progresso do agente, diminuindo o consumo de outros consumidores desse fluxo e até retardando a remoção da fila e a entrega de eventos para outros fluxos. Para evitar isso e permitir o paralelismo máximo no agente, usamos IQueueCache.

O IQueueCache armazena em buffer os eventos de fluxo e fornece uma forma para o agente entregar eventos a cada consumidor em um ritmo próprio. A entrega por consumidor é implementada pelo componente interno chamado IQueueCacheCursor, que acompanha o progresso por consumidor. Dessa forma, cada consumidor recebe eventos em um ritmo próprio: os consumidores rápidos recebem eventos tão rapidamente quanto são removidos da fila, enquanto os consumidores lentos os recebem mais tarde. Depois que a mensagem for entregue a todos os consumidores, ela poderá ser excluída do cache.

Contrapressão

O backpressure no runtime de streaming do Orleans se aplica em dois locais: trazer eventos de fluxo da fila para o agente e entregar os eventos do agente aos consumidores de fluxo.

Este último é fornecido pelo mecanismo interno de entrega de mensagens do Orleans. Cada evento de fluxo é entregue do agente aos consumidores por meio das mensagens padrão de granularidade do Orleans, uma por vez. Ou seja, os agentes enviam um evento (ou um lote de eventos de tamanho limitado) para cada consumidor de fluxo e aguardam essa chamada. O próximo evento não começará a ser entregue até que a tarefa do evento anterior tenha sido resolvida ou interrompida. Dessa forma, naturalmente, limitamos a taxa de entrega por consumidor a uma mensagem por vez.

Ao trazer eventos de fluxo da fila para o agente, o streaming do Orleans fornece um novo mecanismo especial de backpressure. Como o agente separa a remoção dos eventos da fila e a entrega deles aos consumidores, um consumidor lento individual pode ficar para trás, tanto que o IQueueCache será preenchido. Para evitar o crescimento por tempo indefinido do IQueueCache, limitamos o tamanho dele (o limite de tamanho é configurável). No entanto, o agente nunca joga fora os eventos não entregues.

Em vez disso, quando o cache começa a ser preenchido, os agentes reduzem a taxa de remoção de eventos da fila. Dessa forma, podemos "sobreviver" nos períodos de entrega lentos ajustando a taxa na qual consumimos da fila ("backpressure") e voltar às taxas de consumo rápidas mais tarde. Para detectar os vales de "entrega lenta", o IQueueCache usa uma estrutura de dados interna de buckets de cache que acompanha o progresso da entrega de eventos aos consumidores de fluxo individuais. Isso resulta em um sistema muito responsivo e autoajustado.