Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Cada proveedor de flujos (Colas de Azure, EventHub, SMS, SQS, etc.) tiene sus propios detalles y configuraciones específicas para cada cola. En esta sección se proporcionan detalles sobre el uso, la configuración y la implementación de Orleans Azure Queue Streams. Esta sección no es completa. Puede encontrar más detalles en las pruebas de streaming, que contienen la mayoría de las opciones de configuración, específicamente AQClientStreamTests y AQSubscriptionMultiplicityTests, y las funciones de extensión para IAzureQueueStreamConfigurator y ISiloPersistentStreamConfigurator.
Orleans Azure Queue requiere el paquete NuGet Microsoft.Orleans.Streaming.AzureStorage. Además de la implementación, el paquete contiene métodos de extensión que simplifican la configuración en el inicio del silo. La configuración mínima requiere especificar la cadena de conexión, por ejemplo:
hostBuilder
.AddAzureQueueStreams("AzureQueueProvider", configurator =>
{
configurator.ConfigureAzureQueue(
ob => ob.Configure(options =>
{
options.ConnectionString = "[PLACEHOLDER]";
options.QueueNames = new List<string> { "yourprefix-azurequeueprovider-0" };
}));
configurator.ConfigureCacheSize(1024);
configurator.ConfigurePullingAgent(ob => ob.Configure(options =>
{
options.GetQueueMsgsTimerPeriod = TimeSpan.FromMilliseconds(200);
}));
})
// a PubSubStore could be needed, as example Azure Table Storage
.AddAzureTableGrainStorage("PubSubStore", options => {
options.ConnectionString = "[PLACEHOLDER]";
})
Los agentes de extracción ejecutan repetidas extracciones hasta que ya no quedan mensajes en la cola, y luego esperan durante un período configurable antes de continuar con la extracción. Este proceso se produce para cada cola. Internamente, los agentes de extracción colocan mensajes en una memoria caché (una caché por cola) para la entrega a los consumidores, pero dejan de leer si la memoria caché se llena. Los mensajes se quitan de la memoria caché una vez que los consumidores los procesan, por lo que la velocidad de lectura debe limitarse aproximadamente a la velocidad de procesamiento de los consumidores.
Orleans De forma predeterminada, la cola de Azure usa 8 colas (consulte AzureQueueOptions) y 8 agentes de extracción relacionados, un retraso de 100 ms (consulte StreamPullingAgentOptions.GetQueueMsgsTimerPeriod), y un tamaño de caché (IQueueCache) de 4096 mensajes (consulte SimpleQueueCacheOptions.CacheSize).
Configuración
La configuración predeterminada debe ajustarse a un entorno de producción, pero para necesidades especiales, es posible configurar el comportamiento predeterminado. Por ejemplo, en una máquina de desarrollo, es posible reducir el número de agentes de sondeo para usar únicamente una cola. Esto puede ayudar a reducir el uso de CPU y la presión de los recursos.
hostBuilder
.AddAzureQueueStreams<AzureQueueDataAdapterV2>(
"AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options =>
{
options.ConnectionString = "[PLACEHOLDER]";
options.QueueNames =
new List<string>
{
"yourprefix-azurequeueprovider-0"
};
}))
Ajuste
En un sistema de producción, es posible que tenga que ajustar la configuración predeterminada. Al ajustar, se deben tener en cuenta algunos factores, y es específico del servicio.
- En primer lugar, la mayoría de las configuraciones son por cola, lo que significa que para un gran número de flujos, la carga en cada cola se puede reducir configurando más colas.
- Dado que las secuencias procesan los mensajes en orden de forma individual, el factor limitante será el número de eventos que se envían en una sola secuencia.
- Un equilibrio razonable del tiempo de caché (StreamPullingAgentOptions.GetQueueMsgsTimerPeriod) frente al tiempo de visibilidad (AzureQueueOptions.MessageVisibilityTimeout) es que se debe configurar la visibilidad para duplicar los mensajes de tiempo que se espera que estén en la memoria caché.
Ejemplo
Suponiendo que un sistema tiene estas características:
- 100 secuencias,
- 10 colas,
- Cada flujo procesa 60 mensajes por minuto,
- Cada mensaje tarda unos 30 ms en procesarse,
- 1 minuto de mensajes en caché (tiempo de caché).
Por lo tanto, podemos calcular algunos parámetros del sistema:
Secuencias/colas: un equilibrio uniforme de secuencias entre colas sería lo ideal, 10 secuencias por cola (100 secuencias / 10 colas). Pero dado que las corrientes no siempre se distribuirán uniformemente sobre las colas, duplicar (o incluso triplicar) el ideal resulta más seguro que esperar una distribución ideal. Por lo tanto, 20 secuencias/cola (10 secuencias/cola x 2 como factor de seguridad) probablemente sea razonable.
Mensajes/minuto: esto significa que se espera que cada cola procese hasta 1200 mensajes/minuto (60 mensajes x 20 secuencias).
A continuación, podemos determinar el tiempo de visibilidad que se va a usar:
- Tiempo de visibilidad: el tiempo de caché (1 minuto) está configurado para contener 1 minuto de mensajes (es decir, 1200 mensajes, como hemos calculado mensajes por minuto arriba). Se supone que cada mensaje tarda 30 ms en procesarse, entonces podemos esperar que los mensajes pasen hasta 36 segundos en la memoria caché (0,030 s x 1200 msg = 36 segundos), por lo que el tiempo de visibilidad ,duplicado para la seguridad, necesitaría más de 72 segundos (36 segundos de tiempo en caché x 2). En consecuencia, si definimos una memoria caché más grande, eso requeriría un tiempo de visibilidad más largo.
Consideraciones finales en un sistema real:
- Dado que el orden es solo por flujo y una cola consume muchos flujos, es probable que los mensajes se procesen en múltiples flujos en paralelo (como ejemplo: tenemos un grain para el flujo, que puede funcionar en paralelo). Esto significa que consumiremos la memoria caché en mucho menos tiempo, pero planificamos para el peor caso: le dará al sistema el espacio necesario para seguir funcionando correctamente incluso con retrasos intermitentes y errores transitorios.
Por lo tanto, podemos configurar Azure Queue Streams mediante:
hostBuilder
.AddAzureQueueStreams("AzureQueueProvider", configurator => {
configurator.ConfigureAzureQueue(
ob => ob.Configure(options => {
options.ConnectionString = "[PLACEHOLDER]";
options.QueueNames = new List<string> {
"yourprefix-azurequeueprovider-1",
[...]
"yourprefix-azurequeueprovider-10",
};
options.MessageVisibilityTimeout = TimeSpan.FromSeconds(72);
}));
configurator.ConfigureCacheSize(1200);
})