Detalles de implementación de secuencias de Orleans

En esta sección se proporciona información general sobre la implementación de secuencias de Orleans. Se describen los conceptos y detalles que no son visibles en el nivel de la aplicación. Si solo tiene previsto usar secuencias, no tiene que leer esta sección.

Terminología:

Con el término "cola" nos referimos a cualquier tecnología de almacenamiento duradero que puede ingerir eventos de secuencia y permite extraer eventos o proporcionar un mecanismo basado en inserción para consumir eventos. Normalmente, para proporcionar escalabilidad, esas tecnologías proporcionan colas particionadas. Por ejemplo, las colas de Azure permiten crear varias colas, y Event Hubs tiene varios centros.

Secuencias persistentes

Todos los proveedores de secuencias persistentes de Orleans comparten una implementación común PersistentStreamProvider. Estos proveedores de secuencias genéricos deben configurarse con un generador IQueueAdapterFactory específico de la tecnología.

Por ejemplo, con fines de prueba, hay adaptadores de cola que generan sus propios datos de prueba, en lugar de leer los datos de una cola. En el código siguiente se muestra cómo se configura un proveedor de secuencias persistente para usar el adaptador de cola (generador) personalizado. Para ello, se configura el proveedor de secuencias persistente con una función de generador que se usa para crear el adaptador.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Cuando un productor de secuencias genera un nuevo elemento de secuencia y llama a stream.OnNext(), el runtime de streaming de Orleans invoca el método adecuado en el objeto IQueueAdapter de ese proveedor de secuencias, que pone en cola el elemento directamente en la cola adecuada.

Agentes de extracción

En el centro del proveedor de secuencias persistentes se encuentran los agentes de extracción. Los agentes de extracción extraen eventos de un conjunto de colas duraderas y los entrega al código de la aplicación en granos que los consume. Los agentes de extracción podrían considerarse un "microservicio" distribuido, es decir, un componente distribuido elástico con particiones y alta disponibilidad. Los agentes de extracción se ejecutan dentro de los mismos silos que hospedan los granos de aplicación y están totalmente administrados por el runtime de streaming de Orleans.

StreamQueueMapper y StreamQueueBalancer

Los agentes de extracción se parametrizan con IStreamQueueMapper y IStreamQueueBalancer. IStreamQueueMapper proporciona una lista de todas las colas y también es responsable de asignar secuencias a colas. De este modo, el lado productor del proveedor de secuencias persistentes sabe en qué cola debe poner el mensaje.

IStreamQueueBalancer expresa la forma en que las colas se equilibran entre los agentes y los silos de Orleans. El objetivo es asignar colas a los agentes de forma equilibrada para evitar cuellos de botella y admitir la elasticidad. Cuando se agrega un nuevo silo al clúster de Orleans, las colas se reequilibran automáticamente entre los silos antiguos y los nuevos. StreamQueueBalancer permite personalizar este proceso. Orleans tiene varios elementos StreamQueueBalancers integrados para admitir diferentes escenarios de equilibrio (número grande y pequeño de colas) y entornos diferentes (Azure, local y estático).

Siguiendo con el ejemplo anterior del generador de prueba, en el código siguiente se muestra cómo se puede configurar el asignador de colas y el equilibrador de colas.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

El código anterior configura GeneratorAdapterFactory para usar un asignador de colas con ocho colas y equilibra las colas en todo el clúster mediante DynamicClusterConfigDeploymentBalancer.

Protocolo de extracción

Cada silo ejecuta un conjunto de agentes de extracción, cada uno de los cuales realiza la extracción de una cola. Los propios agentes de extracción se implementan mediante un componente del runtime interno, denominado SystemTarget. Los componentes SystemTarget son esencialmente granos del runtime, están sujetos a simultaneidad uniproceso, pueden usar la mensajería de grano normal y son tan ligeros como los granos. A diferencia de los granos, los componentes SystemTarget no son virtuales: los crea explícitamente el runtime y su ubicación no es transparente. Al implementar agentes de extracción como componentes SystemTarget, el runtime de streaming de Orleans puede basarse en características integradas de Orleans y escalar a un gran número de colas, ya que el proceso de crear un agente de extracción es tan barato como crear un grano.

Cada agente de extracción ejecuta un temporizador periódico que extrae de la cola mediante la invocación del método IQueueAdapterReceiver.GetQueueMessagesAsync. Los mensajes devueltos se colocan en la estructura de datos interna por agente denominada IQueueCache. Cada mensaje se inspecciona para averiguar cuál es su secuencia de destino. El agente usa Pub-Sub para averiguar la lista de consumidores de secuencias que se suscribieron a esta secuencia. Una vez que se ha recuperado la lista de consumidores, el agente la almacena localmente (en su caché de publicación/suscripción), por lo que no es necesario consultar con Pub-Sub cada mensaje. El agente también se suscribe a la publicación/suscripción para recibir notificaciones de los nuevos consumidores que se suscriben a esa secuencia. Este protocolo de enlace entre el agente y Pub-Sub garantiza una semántica de suscripción de streaming segura. Es decir, una vez que el consumidor se haya suscrito a la secuencia, verá todos los eventos que se generaron después de que se suscribiera. Además, el uso de StreamSequenceToken permite suscribirse en el pasado.

Caché de colas

IQueueCache es una estructura de datos interna por agente que permite desacoplar la eliminación de nuevos eventos de la cola y entregarlos a los consumidores. También permite desacoplar la entrega a diferentes secuencias y consumidores.

Imagine que una secuencia tiene tres consumidores de secuencias y uno de ellos es lento. Si no se tiene cuidado, este consumidor lento puede afectar al progreso del agente, ralentizar el consumo de otros consumidores de esa secuencia e incluso ralentizar la eliminación de la cola y la entrega de eventos para otras secuencias. Para evitar eso y permitir el paralelismo máximo en el agente, se usa IQueueCache.

IQueueCache almacena en búfer eventos de secuencia y proporciona una manera de que el agente entregue eventos a cada consumidor a su propio ritmo. La entrega por consumidor se implementa mediante el componente interno denominado IQueueCacheCursor, que realiza un seguimiento del progreso por consumidor. De este modo, cada consumidor recibe eventos a su propio ritmo: los consumidores rápidos reciben eventos en cuanto se quitan de la cola, mientras que los consumidores lentos los reciben más adelante. Una vez que el mensaje se ha entregado a todos los consumidores, puede eliminarse de la memoria caché.

Contrapresión

La contrapresión en el runtime de streaming de Orleans se aplica en dos acciones: llevar eventos de secuencia de la cola al agente y entregar los eventos del agente a los consumidores de secuencias.

Esto último lo facilita el mecanismo integrado de entrega de mensajes de Orleans. Cada evento de secuencia se entrega del agente a los consumidores mediante la mensajería de grano estándar de Orleans, de uno en uno. Es decir, los agentes envían un evento (o un lote de eventos de tamaño limitado) a cada consumidor de secuencias y esperan esta llamada. El siguiente evento no empezará a entregarse hasta que se resuelva o interrumpa la tarea del evento anterior. De este modo, se limita naturalmente la tasa de entrega por consumidor a un mensaje de cada vez.

Al llevar eventos de secuencia de la cola al agente, el streaming de Orleans proporciona un nuevo mecanismo especial de contrapresión. Dado que el agente desacopla la eliminación de eventos de la cola y su entrega a los consumidores, un único consumidor lento puede quedar tan retrasado que IQueueCache se llene. Para evitar que IQueueCache crezca indefinidamente, su tamaño se limita (el límite de tamaño es configurable). Aun así, el agente nunca descarta eventos no entregados.

En su lugar, cuando la memoria caché empieza a llenarse, los agentes ralentizan la velocidad de eliminación de eventos de la cola. De este modo, se pueden sobrellevar los períodos de entrega lentos, ya que se ajusta la velocidad a la que se consume desde la cola ("contrapresión") y se retoman más adelante las velocidades de consumo rápidas. Para detectar los valles de "entrega lenta", IQueueCache usa una estructura de datos interna de cubos de caché que lleva un seguimiento del progreso de la entrega de eventos a consumidores de secuencias individuales. Esto da como resultado un sistema que se ajusta automáticamente y que tiene una gran capacidad de respuesta.