Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Cada provedor de fluxo (Azure Queues, EventHub, SMS, SQS, etc.) tem seus próprios detalhes e configurações específicos da fila. Esta seção fornece detalhes sobre o uso, a configuração e a implementação dos Fluxos de Fila doOrleans Azure. Esta seção não é abrangente. Você pode encontrar mais detalhes nos testes de streaming, que contêm a maioria das opções de configuração, especificamente AQClientStreamTests
e AQSubscriptionMultiplicityTests
, e as funções de extensão para IAzureQueueStreamConfigurator e ISiloPersistentStreamConfigurator.
Orleans A Fila do Azure requer o pacote NuGet Microsoft.Orleans.Streaming.AzureStorage. Além da implementação, o pacote contém métodos de extensão que simplificam a configuração na inicialização do silo. A configuração mínima requer a especificação da cadeia de conexão, por exemplo:
hostBuilder
.AddAzureQueueStreams("AzureQueueProvider", configurator =>
{
configurator.ConfigureAzureQueue(
ob => ob.Configure(options =>
{
options.ConnectionString = "[PLACEHOLDER]";
options.QueueNames = new List<string> { "yourprefix-azurequeueprovider-0" };
}));
configurator.ConfigureCacheSize(1024);
configurator.ConfigurePullingAgent(ob => ob.Configure(options =>
{
options.GetQueueMsgsTimerPeriod = TimeSpan.FromMilliseconds(200);
}));
})
// a PubSubStore could be needed, as example Azure Table Storage
.AddAzureTableGrainStorage("PubSubStore", options => {
options.ConnectionString = "[PLACEHOLDER]";
})
Os agentes extraem repetidamente até que não haja mais mensagens numa fila e, em seguida, atrasam por um período configurável antes de continuarem a extrair. Esse processo ocorre para cada fila. Internamente, os agentes de recebimento colocam mensagens em um cache (um cache por fila) para entrega aos consumidores, mas param de ler se o cache for preenchido. As mensagens são removidas do cache assim que os consumidores as processam, portanto, a taxa de leitura deve ser aproximadamente limitada pela taxa de processamento dos consumidores.
Por padrão, Orleans Azure Queue usa 8 filas (consulte AzureQueueOptions) e 8 agentes de recebimento relacionados, um atraso de 100 ms (consulte StreamPullingAgentOptions.GetQueueMsgsTimerPeriod) e um tamanho de cache (IQueueCache) de 4096 mensagens (consulte SimpleQueueCacheOptions.CacheSize).
Configuração
A configuração padrão deve se adequar a um ambiente de produção, mas para necessidades especiais, é possível configurar o comportamento padrão. Por exemplo, em uma máquina de desenvolvimento, é possível reduzir o número de agentes de sondagem para usar apenas uma fila. Isso pode ajudar a reduzir o uso da CPU e a pressão de recursos.
hostBuilder
.AddAzureQueueStreams<AzureQueueDataAdapterV2>(
"AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options =>
{
options.ConnectionString = "[PLACEHOLDER]";
options.QueueNames =
new List<string>
{
"yourprefix-azurequeueprovider-0"
};
}))
Ajuste
Em um sistema de produção, talvez seja necessário ajustar a configuração padrão. Alguns fatores devem ser considerados ao ajustar, e são específicos do serviço.
- Primeiro, a maioria das configurações são por fila, portanto, para um grande número de fluxos, a carga em cada fila pode ser reduzida configurando mais filas.
- Como os fluxos processam mensagens em ordem por fluxo, o fator de regulação será o número de eventos enviados em um único fluxo.
- Um equilíbrio razoável entre o tempo de cache (StreamPullingAgentOptions.GetQueueMsgsTimerPeriod) e o tempo de visibilidade (AzureQueueOptions.MessageVisibilityTimeout) é que a visibilidade deve ser configurada para dobrar o tempo esperado para que as mensagens estejam no cache.
Exemplo
Assumindo um sistema com estas características:
- 100 transmissões,
- 10 filas,
- Cada fluxo processa 60 mensagens por minuto,
- Cada mensagem demora cerca de 30ms a processar,
- 1 minuto de mensagens em cache (tempo de cache).
Assim, podemos calcular alguns parâmetros do sistema:
Streams/fila: Um equilíbrio uniforme de fluxos entre filas seria um ideal de 10 fluxos/fila (100 fluxos / 10 filas). Mas como os fluxos nem sempre serão equilibrados ao longo das filas, dobrar (ou até triplicar) o ideal é mais seguro do que esperar a distribuição ideal. Portanto, 20 fluxos/fila (10 fluxos/fila x 2 como fator de segurança) é provavelmente razoável.
Mensagens/minuto: Isso significa que cada fila deverá processar até 1200 mensagens/minuto (60 mensagens x 20 fluxos).
Em seguida, podemos determinar o tempo de visibilidade a ser usado:
- Tempo de visibilidade: O tempo de cache (1 minuto) está configurado para armazenar 1 minuto de mensagens (portanto, 1200 mensagens, como calculamos mensagens/minuto acima). Assumimos que cada mensagem leva 30 ms para ser processada, então podemos esperar que as mensagens passem até 36 segundos no cache (0,030 seg x 1200 msg = 36 seg), então o tempo de visibilidade - dobrado por segurança - precisaria ser superior a 72 segundos (36 segundos de tempo no cache x 2). Assim, se definirmos um cache maior, isso exigiria um tempo de visibilidade maior.
Considerações finais em um sistema do mundo real:
- Como a ordem é apenas por fluxo e uma fila consome muitos fluxos, as mensagens provavelmente serão processadas em vários fluxos em paralelo (como exemplo: temos um grão para o fluxo, que pode ser executado em paralelo). Isso significa que queimaremos o cache em muito menos tempo, mas planejamos para o pior caso: isso dará ao sistema espaço para continuar a funcionar bem, mesmo sob atrasos intermitentes e erros transitórios.
Assim, podemos configurar o Azure Queue Streams usando:
hostBuilder
.AddAzureQueueStreams("AzureQueueProvider", configurator => {
configurator.ConfigureAzureQueue(
ob => ob.Configure(options => {
options.ConnectionString = "[PLACEHOLDER]";
options.QueueNames = new List<string> {
"yourprefix-azurequeueprovider-1",
[...]
"yourprefix-azurequeueprovider-10",
};
options.MessageVisibilityTimeout = TimeSpan.FromSeconds(72);
}));
configurator.ConfigureCacheSize(1200);
})