Partilhar via


Visão geral dos fluxos de fila do Azure

Cada provedor de fluxo (Azure Queues, EventHub, SMS, SQS, etc.) tem seus próprios detalhes e configurações específicos da fila. Esta seção fornece detalhes sobre o uso, a configuração e a implementação dos Fluxos de Fila doOrleans Azure. Esta seção não é abrangente. Você pode encontrar mais detalhes nos testes de streaming, que contêm a maioria das opções de configuração, especificamente AQClientStreamTests e AQSubscriptionMultiplicityTests, e as funções de extensão para IAzureQueueStreamConfigurator e ISiloPersistentStreamConfigurator.

Orleans A Fila do Azure requer o pacote NuGet Microsoft.Orleans.Streaming.AzureStorage. Além da implementação, o pacote contém métodos de extensão que simplificam a configuração na inicialização do silo. A configuração mínima requer a especificação da cadeia de conexão, por exemplo:

hostBuilder
    .AddAzureQueueStreams("AzureQueueProvider", configurator =>
    {
        configurator.ConfigureAzureQueue(
            ob => ob.Configure(options =>
            {
                options.ConnectionString = "[PLACEHOLDER]";
                options.QueueNames = new List<string> { "yourprefix-azurequeueprovider-0" };
            }));
    configurator.ConfigureCacheSize(1024);
    configurator.ConfigurePullingAgent(ob => ob.Configure(options =>
    {
      options.GetQueueMsgsTimerPeriod = TimeSpan.FromMilliseconds(200);
    }));
  })
  // a PubSubStore could be needed, as example Azure Table Storage
  .AddAzureTableGrainStorage("PubSubStore", options => {
    options.ConnectionString = "[PLACEHOLDER]";
  })

Os agentes extraem repetidamente até que não haja mais mensagens numa fila e, em seguida, atrasam por um período configurável antes de continuarem a extrair. Esse processo ocorre para cada fila. Internamente, os agentes de recebimento colocam mensagens em um cache (um cache por fila) para entrega aos consumidores, mas param de ler se o cache for preenchido. As mensagens são removidas do cache assim que os consumidores as processam, portanto, a taxa de leitura deve ser aproximadamente limitada pela taxa de processamento dos consumidores.

Por padrão, Orleans Azure Queue usa 8 filas (consulte AzureQueueOptions) e 8 agentes de recebimento relacionados, um atraso de 100 ms (consulte StreamPullingAgentOptions.GetQueueMsgsTimerPeriod) e um tamanho de cache (IQueueCache) de 4096 mensagens (consulte SimpleQueueCacheOptions.CacheSize).

Configuração

A configuração padrão deve se adequar a um ambiente de produção, mas para necessidades especiais, é possível configurar o comportamento padrão. Por exemplo, em uma máquina de desenvolvimento, é possível reduzir o número de agentes de sondagem para usar apenas uma fila. Isso pode ajudar a reduzir o uso da CPU e a pressão de recursos.

hostBuilder
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>(
        "AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options =>
            {
                options.ConnectionString = "[PLACEHOLDER]";
                options.QueueNames =
                    new List<string>
                    {
                        "yourprefix-azurequeueprovider-0"
                    };
            }))

Ajuste

Em um sistema de produção, talvez seja necessário ajustar a configuração padrão. Alguns fatores devem ser considerados ao ajustar, e são específicos do serviço.

  1. Primeiro, a maioria das configurações são por fila, portanto, para um grande número de fluxos, a carga em cada fila pode ser reduzida configurando mais filas.
  2. Como os fluxos processam mensagens em ordem por fluxo, o fator de regulação será o número de eventos enviados em um único fluxo.
  3. Um equilíbrio razoável entre o tempo de cache (StreamPullingAgentOptions.GetQueueMsgsTimerPeriod) e o tempo de visibilidade (AzureQueueOptions.MessageVisibilityTimeout) é que a visibilidade deve ser configurada para dobrar o tempo esperado para que as mensagens estejam no cache.

Exemplo

Assumindo um sistema com estas características:

  • 100 transmissões,
  • 10 filas,
  • Cada fluxo processa 60 mensagens por minuto,
  • Cada mensagem demora cerca de 30ms a processar,
  • 1 minuto de mensagens em cache (tempo de cache).

Assim, podemos calcular alguns parâmetros do sistema:

  • Streams/fila: Um equilíbrio uniforme de fluxos entre filas seria um ideal de 10 fluxos/fila (100 fluxos / 10 filas). Mas como os fluxos nem sempre serão equilibrados ao longo das filas, dobrar (ou até triplicar) o ideal é mais seguro do que esperar a distribuição ideal. Portanto, 20 fluxos/fila (10 fluxos/fila x 2 como fator de segurança) é provavelmente razoável.

  • Mensagens/minuto: Isso significa que cada fila deverá processar até 1200 mensagens/minuto (60 mensagens x 20 fluxos).

Em seguida, podemos determinar o tempo de visibilidade a ser usado:

  • Tempo de visibilidade: O tempo de cache (1 minuto) está configurado para armazenar 1 minuto de mensagens (portanto, 1200 mensagens, como calculamos mensagens/minuto acima). Assumimos que cada mensagem leva 30 ms para ser processada, então podemos esperar que as mensagens passem até 36 segundos no cache (0,030 seg x 1200 msg = 36 seg), então o tempo de visibilidade - dobrado por segurança - precisaria ser superior a 72 segundos (36 segundos de tempo no cache x 2). Assim, se definirmos um cache maior, isso exigiria um tempo de visibilidade maior.

Considerações finais em um sistema do mundo real:

  • Como a ordem é apenas por fluxo e uma fila consome muitos fluxos, as mensagens provavelmente serão processadas em vários fluxos em paralelo (como exemplo: temos um grão para o fluxo, que pode ser executado em paralelo). Isso significa que queimaremos o cache em muito menos tempo, mas planejamos para o pior caso: isso dará ao sistema espaço para continuar a funcionar bem, mesmo sob atrasos intermitentes e erros transitórios.

Assim, podemos configurar o Azure Queue Streams usando:

hostBuilder
  .AddAzureQueueStreams("AzureQueueProvider", configurator => {
    configurator.ConfigureAzureQueue(
      ob => ob.Configure(options => {
        options.ConnectionString = "[PLACEHOLDER]";
        options.QueueNames = new List<string> {
          "yourprefix-azurequeueprovider-1",
          [...]
          "yourprefix-azurequeueprovider-10",
        };
        options.MessageVisibilityTimeout = TimeSpan.FromSeconds(72);
      }));
    configurator.ConfigureCacheSize(1200);
  })