Partilhar via


Orleans Detalhes da implementação de fluxos de dados

Esta seção fornece uma visão geral de alto nível da implementação do Orleans Stream. Ele descreve conceitos e detalhes que não são visíveis no nível do aplicativo. Se você planeja usar apenas streams, não precisa ler esta seção.

Terminologia:

Usamos a palavra "fila" para nos referirmos a qualquer tecnologia de armazenamento duradouro que possa ingerir eventos de fluxo e permita puxar eventos ou forneça um mecanismo baseado em push para consumir eventos. Normalmente, para fornecer escalabilidade, essas tecnologias fornecem filas fragmentadas/particionadas. Por exemplo, as Filas do Azure permitem criar várias filas e os Hubs de Eventos têm vários hubs.

Fluxos persistentes

Todos os Orleans provedores de fluxo persistente compartilham uma implementação PersistentStreamProvidercomum. Esses provedores de stream genéricos precisam ser configurados com uma tecnologia específica IQueueAdapterFactory.

Por exemplo, para fins de teste, temos adaptadores de fila que geram seus dados de teste em vez de ler os dados de uma fila. O código abaixo mostra como configuramos um provedor de fluxo persistente para usar nosso adaptador de fila personalizado (gerador). Ele faz isso configurando o provedor de fluxo persistente com uma função de fábrica usada para criar o adaptador.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Quando um produtor de fluxo gera um novo item de fluxo e chama stream.OnNext(), o runtime de streaming Orleans invoca o método apropriado no IQueueAdapter do provedor de fluxo, que enfileira o item diretamente para a fila apropriada.

Agentes de tração

No coração do Provedor de Fluxo Persistente estão os agentes de tração. Os agentes de extração extraem eventos de um conjunto de filas duráveis e os entregam ao código do aplicativo em grãos que os consomem. Pode-se pensar nos agentes de tração como um "microsserviço" distribuído -- um componente distribuído particionado, altamente disponível e elástico. Os agentes de extração são executados dentro dos mesmos silos que hospedam grãos de aplicativos e são totalmente gerenciados pelo Orleans Streaming Runtime.

StreamQueueMapper e StreamQueueBalancer

Os agentes de tração são parametrizados com IStreamQueueMapper e IStreamQueueBalancer. O IStreamQueueMapper fornece uma lista de todas as filas e também é responsável pelo mapeamento de fluxos de dados para filas. Dessa forma, o lado produtor do Provedor de Fluxo Persistente sabe em qual fila enfileirar a mensagem.

O IStreamQueueBalancer expressa a forma como as filas são equilibradas entre Orleans silos e agentes. O objetivo é atribuir filas aos agentes de um modo equilibrado, para prevenir gargalos e suportar a flexibilidade. Quando um novo silo é adicionado ao cluster, as Orleans filas são automaticamente rebalanceadas entre os silos antigos e novos. O StreamQueueBalancer permite personalizar esse processo. Orleans tem vários StreamQueueBalancers internos, para dar suporte a diferentes cenários de balanceamento (grande e pequeno número de filas) e diferentes ambientes (Azure, on-prem, static).

Usando o exemplo do gerador de teste acima, o código abaixo mostra como se pode configurar o mapeador de filas e o balanceador de filas.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

O código acima configura o GeneratorAdapterFactory para usar um mapeador de filas com oito filas e equilibra as filas no cluster usando o DynamicClusterConfigDeploymentBalancer.

Protocolo de Extração

Cada silo executa um conjunto de agentes de extração, cada agente está retirando de uma fila. Os próprios agentes de extração são implementados por um componente de tempo de execução interno, chamado SystemTarget. SystemTargets são essencialmente grãos de execução, estão sujeitos a execução em single-thread, podem usar mensagens de grãos regulares e são tão leves quanto grãos. Ao contrário dos componentes de sistema, os SystemTargets não são virtuais: eles são criados explicitamente (pelo runtime) e não têm transparência de localização. Ao implementar agentes pulling como SystemTargets, o Orleans Streaming Runtime pode contar com recursos internos Orleans e pode ser dimensionado para um número muito grande de filas, já que criar um novo agente pulling é tão barato quanto criar um novo grão.

Cada agente de extração executa um temporizador periódico que extrai da fila invocando o método IQueueAdapterReceiver.GetQueueMessagesAsync. As mensagens retornadas são colocadas na estrutura de dados interna por agente chamada IQueueCache. Cada mensagem é inspecionada para descobrir seu fluxo de destino. O agente usa o Pub-Sub para determinar a lista de consumidores de transmissão que se inscreveram nesta transmissão. Após a lista de consumidores ser recuperada, o agente armazena-a localmente (em seu cache pub-sub) de modo a não precisar consultar Pub-Sub em todas as mensagens. O agente também se inscreve no pub-sub para receber notificações de quaisquer novos consumidores que assinem esse fluxo. Esse aperto de mão entre o agente e o pub-sub garante uma forte semântica de assinatura de streaming: uma vez que o consumidor tenha se inscrito no streaming, ele verá todos os eventos que foram gerados depois que ele se inscreveu. Além disso, o uso de StreamSequenceToken permite subscrição retroativa.

Cache de fila

IQueueCache é uma estrutura de dados interna por agente que permite dissociar novos eventos da fila e entregá-los aos consumidores. Permite também dissociar a entrega a diferentes fluxos e diferentes consumidores.

Imagine uma situação em que um stream tem 3 consumidores de stream e um deles é lento. Se não forem tomados cuidados, essa lentidão do consumidor pode impactar o progresso do agente, retardando o consumo de outros consumidores daquele fluxo, e até mesmo retardando o enfileiramento e a entrega de eventos para outros fluxos. Para evitar isso e permitir o máximo paralelismo no agente, usamos IQueueCache.

IQueueCache Buffers transmite eventos e fornece uma maneira para o agente entregar eventos para cada consumidor em seu próprio ritmo. A entrega para cada consumidor é implementada pelo componente interno chamado IQueueCacheCursor, que monitoriza o progresso de cada consumidor. Dessa forma, cada consumidor recebe eventos no seu próprio ritmo: os consumidores rápidos recebem eventos tão rapidamente quanto são retirados da fila, enquanto os consumidores lentos os recebem mais tarde. Uma vez que a mensagem é entregue a todos os consumidores, ela pode ser excluída do cache.

Contrapressão

A backpressure no Orleans Streaming Runtime se aplica em dois lugares: trazendo eventos de fluxo da fila para o agente e entregando os eventos do agente para os consumidores de streaming.

Este último é fornecido pelo mecanismo de entrega de mensagens incorporado Orleans . Cada evento de transmissão é entregue do agente aos consumidores por meio das mensagens de grãos padrão Orleans , uma de cada vez. Ou seja, os agentes enviam um evento (ou um lote de eventos de tamanho limitado) para cada consumidor de fluxo e aguardam essa chamada. O próximo evento não começará a ser entregue até que a Tarefa do evento anterior tenha sido resolvida ou quebrada. Desta forma, limitamos naturalmente a taxa de entrega por consumidor a uma mensagem de cada vez.

Ao trazer eventos de fluxo da fila para o agente, Orleans o Streaming fornece um novo mecanismo especial de Backpressure. Uma vez que o agente separa o desenfileiramento de eventos da fila e os entrega aos consumidores, um único consumidor lento pode ficar para trás tanto que o IQueueCache pode encher. Para evitar IQueueCache que cresça indefinidamente, limitamos o seu tamanho (o limite de tamanho é configurável). No entanto, o agente nunca descarta eventos não entregues.

Em vez disso, quando o cache começa a encher, os agentes reduzem a frequência de remoção de eventos da fila. Dessa forma, podemos "ultrapassar" os períodos de entrega lentos ajustando a taxa na qual consumimos da fila de espera ("backpressure") e posteriormente retomar taxas de consumo elevadas. Para detetar os vales de "entrega lenta", usa-se uma IQueueCache estrutura de dados interna de buckets de cache que rastreia o progresso da entrega de eventos para consumidores de fluxo individuais. Isso resulta em um sistema muito responsivo e autoajustável.