API de streaming de Orleans

Las aplicaciones interactúan con flujos a través de API que son muy similares a las conocidas Extensiones reactivas (Rx) en .NET. La principal diferencia es que las extensiones de flujo de Orleans son asincrónicas, para hacer que el procesamiento sea más eficaz en el tejido de proceso distribuido y escalable de Orleans.

Flujo asincrónico

Una aplicación comienza mediante un proveedor de flujos para obtener un manipulador de un flujo. Puede leer más sobre los proveedores de flujos aquí, pero por ahora, puede considerarlo como un generador de flujos que permite a los implementadores personalizar el comportamiento y la semántica de los flujos:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Una aplicación puede obtener una referencia al proveedor de flujos llamando al método Grain.GetStreamProvider cuando se encuentra dentro de un grano o llamando al método GrainClient.GetStreamProvider cuando está en el cliente.

Orleans.Streams.IAsyncStream<T> es un manipulador lógico fuertemente tipado para un flujo virtual. Es similar en espíritu a la Referencia de grano de Orleans. Las llamadas a GetStreamProvider y GetStream son puramente locales. Los argumentos para GetStream son un GUID y una cadena adicional que llamamos a un espacio de nombres de flujo (que puede ser null). Juntos, el GUID y la cadena de espacio de nombres componen la identidad de flujo (similar en espíritu a los argumentos de IGrainFactory.GetGrain). La combinación de GUID y cadena de espacio de nombres proporciona flexibilidad adicional para determinar las identidades de flujo. Al igual que el grano 7 puede existir dentro del tipo de grano PlayerGrain y un grano 7 diferente puede existir dentro del tipo de grano ChatRoomGrain, el flujo 123 puede existir con el espacio de nombres de flujo PlayerEventsStream y puede existir un flujo 123 diferente dentro del espacio de nombres de flujo ChatRoomMessagesStream.

Producción y consumo

IAsyncStream<T> implementa las interfaces IAsyncObserver<T> y IAsyncObservable<T>. De este modo, una aplicación puede usar el flujo para generar nuevos eventos en el flujo mediante Orleans.Streams.IAsyncObserver<T> o para suscribirse y consumir eventos de un flujo mediante Orleans.Streams.IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Para generar eventos en el flujo, una aplicación simplemente genera una llamada

await stream.OnNextAsync<T>(event)

Para suscribirse a un flujo, una aplicación genera una llamada

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

El argumento para SubscribeAsync puede ser un objeto que implementa la interfaz IAsyncObserver<T> o una combinación de funciones lambda para procesar eventos entrantes. Hay más opciones para SubscribeAsync disponibles a través de la clase AsyncObservableExtensions. SubscribeAsync devuelve un StreamSubscriptionHandle<T>, que es un manipulador opaco que se puede usar para cancelar la suscripción del flujo (similar en espíritu a una versión asincrónica de IDisposable).

await subscriptionHandle.UnsubscribeAsync()

Es importante tener en cuenta que la suscripción es para un grano, no para la activación. Una vez que el código de grano se suscribe al flujo, esta suscripción supera la vida de esta activación y permanece duradera para siempre, hasta que el código de grano (potencialmente en una activación diferente) cancela explícitamente la suscripción. Este es el corazón de una abstracción de flujo virtual: no solo existen siempre todos los flujos, lógicamente, sino que también una suscripción de flujo es duradera y vive más allá de una activación física determinada que creó la suscripción.

Multiplicidad

Un flujo de Orleans puede tener varios productores y varios consumidores. Un mensaje publicado por un productor se entregará a todos los consumidores que se hayan suscrito al flujo antes de que se publique el mensaje.

Además, el consumidor puede suscribirse al mismo flujo varias veces. Cada vez que se suscribe, vuelve a obtener un único StreamSubscriptionHandle<T>. Si un grano (o cliente) se suscribe X veces al mismo flujo, recibirá el mismo evento X veces, una vez para cada suscripción. El consumidor también puede cancelar la suscripción individual. Puede encontrar todas sus suscripciones actuales llamando a:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Recuperación de errores

Si el productor de un flujo muere (o su grano está desactivado), no hay nada que hacer. La próxima vez que este grano quiera generar más eventos, puede volver a obtener el manipulador del flujo y generar nuevos eventos de la misma manera.

La lógica del consumidor es un poco más implicada. Como hemos dicho antes, una vez que se suscribe un grano de consumidor a un flujo, esta suscripción es válida hasta que el grano cancela la suscripción explícitamente. Si el consumidor del flujo muere (o su grano está desactivado) y se genera un nuevo evento en el flujo, el grano del consumidor se volverá a activar automáticamente (al igual que cualquier grano regular de Orleans se activa automáticamente cuando se envía un mensaje a este). Lo único que el código de grano debe hacer ahora es proporcionar un IAsyncObserver<T> para procesar los datos. El consumidor debe volver a adjuntar la lógica de procesamiento como parte del método OnActivateAsync(). Para ello, puede llamar a:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

El consumidor usa el manipulador anterior que obtuvo cuando se suscribió por primera vez a «reanudar procesamiento». Tenga en cuenta que ResumeAsync simplemente actualiza una suscripción existente con la nueva instancia de lógica IAsyncObserver y no cambia el hecho de que este consumidor ya está suscrito a este flujo.

¿Cómo obtiene el consumidor un antiguo subscriptionHandle? Existen dos opciones. Es posible que el consumidor haya conservado el manipulador que se devolvió de la operación SubscribeAsync original y puede usarlo ahora. Como alternativa, si el consumidor no tiene el manipulador, puede solicitar IAsyncStream<T> para todos sus manipuladores de suscripción activos llamando a:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

El consumidor ahora puede reanudar todos ellos o cancelar la suscripción de algunos si lo desea.

Sugerencia

Si el grano del consumidor implementa la interfaz IAsyncObserver<T> directamente (public class MyGrain<T> : Grain, IAsyncObserver<T>), no debe requerirse en teoría para volver a adjuntar IAsyncObserver y, por lo tanto, no tendrá que llamar a ResumeAsync. El entorno de ejecución de flujo debe ser capaz de averiguar automáticamente que el grano ya implementa IAsyncObserver y simplemente invocará esos métodos IAsyncObserver. Sin embargo, el entorno de ejecución de flujo actualmente no admite esto y el código de grano todavía necesita llamar explícitamente a ResumeAsync, incluso si el grano implementa IAsyncObserver directamente.

Suscripciones explícitas e implícitas

De forma predeterminada, un consumidor de flujos tiene que suscribirse explícitamente al flujo. Normalmente, esta suscripción se desencadenaría mediante algún mensaje externo que recibe el grano (o el cliente) que le indica que se suscriba. Por ejemplo, en un servicio de chat cuando un usuario se une a una sala de chat, su grano recibe un mensaje JoinChatGroup con el nombre del chat, lo que hará que el usuario se suscriba a este flujo de chat.

Además, los flujos de Orleans también admiten suscripciones implícitas. En este modelo, el grano no se suscribe explícitamente al flujo. Este grano se suscribe automáticamente, implícitamente, solo en función de su identidad específica y de un ImplicitStreamSubscriptionAttribute. El valor principal de las suscripciones implícitas permite que la actividad de flujo desencadene la activación específica (por lo tanto, desencadenar la suscripción) automáticamente. Por ejemplo, mediante flujos SMS, si un grano quería generar un flujo y otro proceso de grano, este flujo tendría que conocer la identidad del grano del consumidor y hacer una llamada de grano para indicarle que se suscriba al flujo. Solo después de eso, puede empezar a enviar eventos. En su lugar, mediante suscripciones implícitas, el productor solo puede empezar a producir eventos en un flujo, y el grano del consumidor se activará y suscribirá automáticamente al flujo. En ese caso, al productor no le importa quién está leyendo los eventos

MyGrainType de implementación de grano puede declarar un atributo [ImplicitStreamSubscription("MyStreamNamespace")]. Esto indica al runtime de flujo que cuando se genera un evento en un flujo cuya identidad es GUID XXX y espacio de nombres "MyStreamNamespace", se debe entregar al grano cuya identidad es XXX de tipo MyGrainType. Es decir, el runtime asigna el flujo <XXX, MyStreamNamespace> al grano del consumidor <XXX, MyGrainType>.

La presencia de ImplicitStreamSubscription hace que el runtime de flujo suscriba automáticamente este grano a un flujo y entregue los eventos de flujo. Sin embargo, el código de grano todavía debe indicar al runtime cómo quiere que se procesen los eventos. Básicamente, debe adjuntar IAsyncObserver. Por lo tanto, cuando se activa el grano, el código de grano dentro de OnActivateAsync debe llamar a:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Escritura de lógica de suscripción

A continuación se muestran las instrucciones sobre cómo escribir la lógica de suscripción para varios casos: suscripciones explícitas e implícitas, flujos rebobinables y no rebobinables. La principal diferencia entre las suscripciones explícitas e implícitas es que para el grano implícito siempre tiene exactamente una suscripción implícita para cada espacio de nombres de flujo; no hay ninguna manera de crear varias suscripciones (no hay multiplicidad de suscripción), no hay ninguna manera de cancelar la suscripción, y la lógica de grano siempre necesita adjuntar la lógica de procesamiento. Esto también significa que para las suscripciones implícitas nunca es necesario reanudar una suscripción. Por otro lado, para las suscripciones explícitas, es necesario reanudar la suscripción; de lo contrario, si el grano se suscribe de nuevo, dará lugar a que el grano se suscriba varias veces.

Suscripciones implícitas:

En el caso de las suscripciones implícitas, el grano debe suscribirse para adjuntar la lógica de procesamiento. Esto debe hacerse en el método del grano OnActivateAsync. El grano simplemente debe ejecutar await stream.SubscribeAsync(OnNext ...) en su método OnActivateAsync. Esto hará que esta activación determinada asocie la función OnNext para procesar ese flujo. Opcionalmente, el grano puede especificar StreamSequenceToken como argumento para SubscribeAsync, lo que hará que esta suscripción implícita empiece a consumir desde ese token. Nunca es necesario que una suscripción implícita llame a ResumeAsync.

public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    await stream.SubscribeAsync(OnNextAsync)
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync)
}

Suscripciones explícitas:

En el caso de las suscripciones explícitas, un grando debe llamar a SubscribeAsync para suscribirse al flujo. Esto crea una suscripción, así como asocia la lógica de procesamiento. La suscripción explícita existirá hasta que el grano se cancele, por lo que si un grano se desactiva y reactiva, el grano todavía se suscribe explícitamente, pero no se adjuntará ninguna lógica de procesamiento. En este caso, el grano debe volver a adjuntar la lógica de procesamiento. Para ello, en su OnActivateAsync, el grano primero debe averiguar qué suscripciones tiene, llamando a IAsyncStream<T>.GetAllSubscriptionHandles(). El grano debe ejecutar ResumeAsync en cada manipulador con el que desea continuar procesando o cancelar la suscripción asincrónica en los manipuladores con los que se realiza. Opcionalmente, el grano puede especificar StreamSequenceToken como argumento para ResumeAsync, lo que hará que esta suscripción implícita empiece a consumir desde ese token.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Orden y tokens de flujo

El orden de entrega de eventos entre un productor individual y un consumidor individual depende del proveedor de flujo.

Con SMS, el productor controla explícitamente el orden de los eventos vistos por el consumidor mediante el control de la forma en que el productor los publica. De forma predeterminada (si la opción SimpleMessageStreamProviderOptions.FireAndForgetDelivery para el proveedor de SMS está establecida en falso) y si el productor espera cada llamada OnNextAsync, los eventos llegan en orden FIFO. En SMS, el productor decide cómo controlar los errores de entrega que se indicarán mediante un Task roto devuelto por la llamada OnNextAsync.

Los flujos de cola de Azure no garantizan el orden FIFO, ya que las colas de Azure subyacentes no garantizan el orden en casos de error. (Garantizan el orden FIFO en ejecuciones sin errores). Cuando un productor genera el evento en la cola de Azure, si se produce un error en la operación de cola, depende del productor intentar otra cola y, más adelante, tratar con posibles mensajes duplicados. En el lado de entrega, el runtime de Orleans Streaming pone en cola el evento de la cola e intenta entregarlo para su procesamiento a los consumidores. El runtime de Orleans Streaming elimina el evento de la cola solo tras el procesamiento correcto. Si se produce un error en la entrega o el procesamiento, el evento no se elimina de la cola y volverá a aparecer automáticamente en la cola más adelante. El runtime de flujo intentará entregarlo de nuevo, lo que podría interrumpir el pedido FIFO. El comportamiento anterior coincide con la semántica normal de las colas de Azure.

Orden definido por la aplicación: para tratar los problemas de ordenación anteriores, una aplicación puede especificar opcionalmente su orden. Esto se logra a través de StreamSequenceToken, que es un objeto IComparable opaco que se puede usar para ordenar eventos. Un productor puede pasar un StreamSequenceToken opcional a la llamada OnNext. Este StreamSequenceToken se pasará al consumidor y se entregará junto con el evento. De este modo, una aplicación puede razonar y reconstruir su orden independientemente del entorno de ejecución de flujo.

Flujos rebobinables

Algunos flujos solo permiten que una aplicación se suscriba a ellas a partir del último momento en el tiempo, mientras que otros permiten «retroceder en el tiempo». Esta última funcionalidad depende de la tecnología de puesta en cola subyacente y del proveedor de flujos concreto. Por ejemplo, las colas de Azure solo permiten consumir los eventos en cola más recientes, mientras que EventHub permite reproducir eventos desde un momento dado arbitrario (hasta un tiempo de expiración). Los flujos que admiten el retroceso en el tiempo se denominan flujos rebobinables.

El consumidor de un flujo rebobinable puede pasar un StreamSequenceToken a la llamada SubscribeAsync. El tiempo de ejecución entregará eventos a partir de ese StreamSequenceToken. Un token null significa que el consumidor quiere recibir eventos a partir del más reciente.

La capacidad de rebobinar un flujo es muy útil en escenarios de recuperación. Por ejemplo, considere un detalle que se suscribe a un flujo y controla periódicamente su estado junto con el token de flujo más reciente. Al recuperarse de un error, el grano puede volver a suscribirse al mismo flujo desde el último token de flujo de punto de control, con lo que se recupera sin perder ningún evento generado desde el último punto de control.

El proveedor de Event Hubs se puede rebobinar. Puede encontrar su código en GitHub: Orleans/Azure/Orleans.Streaming.EventHubs. Los proveedores de SMS y Cola de Azureno se pueden rebobinar.

Procesamiento de escalado horizontal sin estado automáticamente

De manera predeterminada, Orleans Streaming está destinado a admitir un gran número de flujos relativamente pequeños, cada uno procesado por uno o más granos con estado. Colectivamente, el procesamiento de todos los flujos juntos se particiona entre un gran número de granos regulares (con estado). El código de la aplicación controla esta particionamiento mediante la asignación de identificadores de flujo e identificadores de grano y mediante la suscripción explícita. El objetivo es el procesamiento con estado particionado.

Sin embargo, también hay un escenario interesante de procesamiento sin estado escalado horizontal automáticamente. En este escenario, una aplicación tiene un pequeño número de flujos (o incluso un flujo grande) y el objetivo es el procesamiento sin estado. Un ejemplo es un flujo global de eventos, donde el procesamiento implica descodificar cada evento y reenviarlo a otros flujos para un procesamiento con estado adicional. El procesamiento de flujos con escalabilidad horizontal sin estado se puede admitir en Orleans a través de granos StatelessWorkerAttribute.

Estado actual del procesamiento automático con escalabilidad horizontal sin estado: esto aún no se ha implementado. Un intento de suscribirse a una secuencia de un grano dará como resultado un StatelessWorker comportamiento indefinido. Estamos pensando en admitir esta opción.

Granos y clientes de Orleans

Los flujos de Orleans funcionan uniformemente entre granos y clientes de Orleans. Es decir, se pueden usar las mismas API dentro de un grano y en un cliente de Orleans para producir y consumir eventos. Esto simplifica considerablemente la lógica de la aplicación, lo que hace a las API especiales del lado cliente, como observadores de granos, redundantes.

Pub-sub de flujo totalmente administrado y confiable

Para realizar un seguimiento de las suscripciones de flujo, Orleans usa un componente de runtime denominado Streaming Pub-Sub que sirve como punto de encuentro para los consumidores y productores de flujos. Pub-sub realiza un seguimiento de todas las suscripciones de flujo y las conserva y hace coincidir a los consumidores de flujos con los productores de flujos.

Las aplicaciones pueden elegir dónde y cómo se almacenan los datos Pub-Sub. El propio componente de Pub-Sub se implementa como granos (denominados PubSubRendezvousGrain), que usan persistencia declarativa de Orleans. PubSubRendezvousGrain usa el proveedor de almacenamiento denominado PubSubStore. Al igual que sucede con cualquier grano, puede designar una implementación para un proveedor de almacenamiento. En Streaming Pub-Sub se puede cambiar la implementación de PubSubStore en tiempo de construcción de silo mediante el generador de hosts de silo:

Lo siguiente configura el Pub-Sub para almacenar su estado en las tablas de Azure.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

De este modo, los datos Pub-Sub se almacenarán de forma duradera en Azure Table. Para el desarrollo inicial, también se puede usar el almacenamiento de memoria. Además de Pub-Sub, el runtime de Orleans Streaming Runtime entrega eventos de productores a consumidores, administra todos los recursos en tiempo de ejecución asignados a flujos usados activamente y recopila de forma transparente los recursos de runtime de secuencias no utilizadas.

Configuración

Para usar flujos, debe habilitar los proveedores de flujos a través del host de silo o los generadores de cliente de clúster. Puede encontrar más información sobre los proveedores de flujo aquí. Configuración del proveedor de flujos de ejemplo:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Consulte también

Proveedores de secuencias de Orleans