Condividi tramite


API di flusso di Orleans

Le applicazioni interagiscono con i flussi tramite API molto simili alle note estensioni Rx (Reactive Extensions) in .NET. La differenza principale riguarda il fatto che le estensioni di flusso di Orleans sono asincrone, per rendere l'elaborazione più efficiente nell'infrastruttura di calcolo distribuita e scalabile di Orleans.

Flusso asincrono

Un'applicazione inizia usando un provider di flussi per ottenere un handle a un flusso. Altre informazioni sui provider di flussi sono disponibili qui, ma per il momento è possibile paragonarlo a una factory di flussi che consente agli implementatori di personalizzare il comportamento e la semantica dei flussi:

IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");

Un'applicazione può ottenere un riferimento al provider di flussi chiamando il metodo Grain.GetStreamProvider dall'interno di un grano o chiamando il metodo GrainClient.GetStreamProvider dal client.

Orleans.Streams.IAsyncStream<T> è un handle logico fortemente tipizzato a un flusso virtuale. È simile a un riferimento a un grano in Orleans. Le chiamate a GetStreamProvider e GetStream sono esclusivamente locali. Gli argomenti di GetStream sono un GUID e una stringa aggiuntiva che costituiscono lo spazio dei nomi del flusso (che può essere Null). Insieme, il GUID e la stringa dello spazio dei nomi compongono l'identità del flusso (in modo analogo agli argomenti di IGrainFactory.GetGrain). La combinazione di GUID e stringa dello spazio dei nomi offre maggiore flessibilità per determinare le identità del flusso. Come il grano 7 può esistere all'interno del tipo di grano PlayerGrain e un diverso grano 7 può esistere all'interno del tipo di grano ChatRoomGrain, anche il flusso 123 può esistere nello spazio dei nomi di flusso PlayerEventsStream e un diverso flusso 123 può esistere nello spazio dei nomi di flusso ChatRoomMessagesStream.

Produzione e utilizzo

IAsyncStream<T> implementa entrambe le interfacce IAsyncObserver<T> e IAsyncObservable<T>. In questo modo, un'applicazione può usare il flusso per produrre nuovi eventi nel flusso tramite Orleans.Streams.IAsyncObserver<T> oppure per sottoscrivere e utilizzare gli eventi da un flusso tramite Orleans.Streams.IAsyncObservable<T>.

public interface IAsyncObserver<in T>
{
    Task OnNextAsync(T item, StreamSequenceToken token = null);
    Task OnCompletedAsync();
    Task OnErrorAsync(Exception ex);
}

public interface IAsyncObservable<T>
{
    Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}

Per produrre eventi nel flusso, un'applicazione esegue la chiamata seguente

await stream.OnNextAsync<T>(event)

Per sottoscrivere un flusso, un'applicazione esegue la chiamata seguente

StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)

L'argomento di SubscribeAsync può essere un oggetto che implementa l'interfaccia IAsyncObserver<T> o una combinazione di funzioni lambda per l'elaborazione degli eventi in ingresso. Sono disponibili altre opzioni per SubscribeAsync tramite la classe AsyncObservableExtensions. SubscribeAsync restituisce StreamSubscriptionHandle<T>, che è un handle opaco che è possibile usare per annullare la sottoscrizione del flusso (in modo analogo a una versione asincrona di IDisposable).

await subscriptionHandle.UnsubscribeAsync()

È importante notare che la sottoscrizione riguarda un grano e non l'attivazione. Dopo che il codice del grano ha effettuato la sottoscrizione del flusso, questa sottoscrizione supera la durata dell'attivazione e rimane valida per sempre, fino a quando il codice del grano (potenzialmente in un'attivazione diversa) non annulla esplicitamente la sottoscrizione. Questo concetto è alla base dell'astrazione di un flusso virtuale: non solo tutti i flussi esistono sempre, dal punto di vista logico, ma anche una sottoscrizione del flusso è permanente e rimane valida anche oltre una particolare attivazione fisica che ha creato la sottoscrizione.

Molteplicità

Un flusso di Orleans può avere più producer e più consumer. Un messaggio pubblicato da un producer verrà recapitato a tutti i consumer che hanno effettuato la sottoscrizione del flusso prima della pubblicazione del messaggio.

Il consumer può inoltre effettuare la sottoscrizione dello stesso flusso più volte. Ogni volta che effettua una sottoscrizione, riceve un oggetto StreamSubscriptionHandle<T> univoco. Se un grano (o client) effettua la sottoscrizione X volte per lo stesso flusso, riceverà lo stesso evento X volte, una volta per ogni sottoscrizione. Il consumer può anche annullare una singola sottoscrizione. È possibile trovare tutte le sottoscrizioni correnti chiamando:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Ripristino da errori

Se il producer di un flusso non è più presente, o il relativo grano viene disattivato, non è necessario eseguire alcuna operazione. La volta successiva che il grano vuole produrre eventi, può ottenere di nuovo l'handle del flusso e produrre nuovi eventi nello stesso modo.

La logica del consumer è un po' più complessa. Come illustrato in precedenza, una volta che il grano di un consumer sottoscrive un flusso, tale sottoscrizione rimane valida fino a quando il grano non la annulla in modo esplicito. Se il consumer del flusso non è più presente, o il relativo grano viene disattivato, e viene generato un nuovo evento nel flusso, il grano del consumer viene riattivato automaticamente (proprio come qualsiasi grano di Orleans viene attivato automaticamente quando gli viene inviato un messaggio). L'unica operazione che il codice del grano deve eseguire è fornire un oggetto IAsyncObserver<T> per l'elaborazione dei dati. Il consumer deve ricollegare la logica di elaborazione come parte del metodo OnActivateAsync(). A tale scopo, può chiamare:

StreamSubscriptionHandle<int> newHandle =
    await subscriptionHandle.ResumeAsync(IAsyncObserver);

Per "riprendere l'elaborazione", il consumer usa l'handle precedente ottenuto al momento della prima sottoscrizione. Si noti che ResumeAsync aggiorna semplicemente una sottoscrizione esistente con la nuova istanza della logica di IAsyncObserver e non modifica il fatto che il consumer abbia già effettuato la sottoscrizione al flusso.

In che modo il consumer ottiene un oggetto subscriptionHandle precedente? Sono disponibili due opzioni. Il consumer può aver salvato in modo permanente l'handle ricevuto dall'operazione SubscribeAsync originale e può quindi usarlo. In alternativa, se il consumer non dispone dell'handle, può richiedere un oggetto IAsyncStream<T> per tutti gli handle di sottoscrizione attivi chiamando:

IList<StreamSubscriptionHandle<T>> allMyHandles =
    await IAsyncStream<T>.GetAllSubscriptionHandles();

Il consumer può ora riprendere tutte le elaborazioni o annullare alcune sottoscrizioni, se necessario.

Suggerimento

Se il grano del consumer implementa l'interfaccia IAsyncObserver<T> direttamente (public class MyGrain<T> : Grain, IAsyncObserver<T>), in teoria non deve necessariamente ricollegare IAsyncObserver e pertanto non sarà necessario chiamare ResumeAsync. Il runtime di flusso deve essere in grado di determinare automaticamente che il grano implementa già IAsyncObserver e richiamerà solo tali metodi IAsyncObserver. Tuttavia, il runtime di flusso attualmente non supporta questo comportamento e il codice del grano deve quindi chiamare ResumeAsync in modo esplicito anche se il grano implementa IAsyncObserver.

Sottoscrizioni esplicite e implicite

Per impostazione predefinita, un consumer di flusso deve sottoscrivere in modo esplicito il flusso. Questa sottoscrizione viene in genere attivata da un messaggio esterno ricevuto dal grano (o client) che indica di effettuare la sottoscrizione. In un servizio di chat, ad esempio, quando un utente partecipa a una chat riceve un messaggio JoinChatGroup con il nome della chat, che fa sì che il grano dell'utente sottoscriva il flusso di chat.

I flussi di Orleans supportano inoltre le sottoscrizioni implicite. In questo modello, il grano non effettua in modo esplicito la sottoscrizione del flusso. La sottoscrizione avviene automaticamente, in modo implicito, in base all'identità del grano e a un oggetto ImplicitStreamSubscriptionAttribute. Il valore principale delle sottoscrizioni implicite è la possibilità di attivare automaticamente il grano (e quindi la sottoscrizione). Usando i flussi SMS, ad esempio, se un grano vuole produrre un flusso e un altro grano elabora il flusso, il producer deve conoscere l'identità del grano consumer ed effettuare una chiamata al grano per chiedere di sottoscrivere il flusso. Solo dopo questa operazione sarà possibile iniziare a inviare eventi. Usando invece le sottoscrizioni implicite, il producer può semplicemente iniziare a produrre eventi per un flusso e il grano consumer verrà attivato e sottoscriverà il flusso automaticamente. In tal caso, al producer non interessa sapere chi legge gli eventi

L'oggetto MyGrainType dell'implementazione del grano può dichiarare un attributo [ImplicitStreamSubscription("MyStreamNamespace")]. Ciò indica al runtime di flusso che quando viene generato un evento in un flusso la cui identità è costituita dal GUID XXX e dallo spazio dei nomi "MyStreamNamespace", tale evento deve essere recapitato al grano la cui identità è XXX di tipo MyGrainType. Ovvero, il runtime esegue il mapping dell'oggetto <XXX, MyStreamNamespace> del flusso all'oggetto <XXX, MyGrainType> del grano consumer.

La presenza di ImplicitStreamSubscription fa in modo che il runtime di flusso effettui automaticamente la sottoscrizione del flusso per il grano e recapiti a esso gli eventi del flusso. Tuttavia, il codice del grano deve comunque indicare al runtime come vuole che vengano elaborati gli eventi. Essenzialmente, deve collegare l'oggetto IAsyncObserver. Pertanto, quando il grano viene attivato, il relativo codice all'interno di OnActivateAsync deve chiamare:

IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

StreamId streamId =
    StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
    streamProvider.GetStream<T>(streamId);

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
    base.GetStreamProvider("SimpleStreamProvider");

IAsyncStream<T> stream =
    streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");

StreamSubscriptionHandle<T> subscription =
    await stream.SubscribeAsync(IAsyncObserver<T>);

Scrittura della logica di sottoscrizione

Di seguito sono riportate le linee guida su come scrivere la logica di sottoscrizione per vari casi: sottoscrizioni esplicite e implicite, flussi riavvolgibili e non riavvolgibili. La differenza principale tra sottoscrizioni esplicite e implicite è che per l'opzione implicita il grano ha sempre esattamente una sottoscrizione implicita per ogni spazio dei nomi del flusso e non è possibile creare più sottoscrizioni (non è prevista la molteplicità delle sottoscrizioni), non c'è alcun modo per annullare la sottoscrizione e la logica del grano deve sempre solo collegare la logica di elaborazione. Ciò significa anche che per le sottoscrizioni implicite non è mai necessario riprendere una sottoscrizione. D'altra parte, per le sottoscrizioni esplicite, è necessario riprendere la sottoscrizione, in caso contrario, se il grano effettua di nuovo la sottoscrizione, questa sarà presente più volte.

Sottoscrizioni implicite:

Per le sottoscrizioni implicite, la granularità deve comunque sottoscrivere per collegare la logica di elaborazione. Questa operazione può essere eseguita nella granularità del consumer implementando le IStreamSubscriptionObserver interfacce e IAsyncObserver<T> , consentendo l'attivazione separata dalla sottoscrizione. Per sottoscrivere il flusso, l'granularità crea un handle e chiama await handle.ResumeAsync(this) nel relativo OnSubscribed(...) metodo.

Per elaborare i messaggi, il IAsyncObserver<T>.OnNextAsync(...) metodo viene implementato per ricevere dati di flusso e un token di sequenza. In alternativa, il ResumeAsync metodo può accettare un set di delegati che rappresentano i metodi dell'interfaccia IAsyncObserver<T> , onNextAsync, onErrorAsynce onCompletedAsync.

public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
    _logger.LogInformation($"Received an item from the stream: {item}");
}

public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
    var handle = handleFactory.Create<string>();
    await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(
            this.GetPrimaryKey(), "MyStreamNamespace");

    await stream.SubscribeAsync(OnNextAsync);
}

Sottoscrizioni esplicite:

Per le sottoscrizioni esplicite, un grano deve chiamare SubscribeAsync per sottoscrivere il flusso. Viene così creata una sottoscrizione e viene collegata la logica di elaborazione. La sottoscrizione esplicita sarà presente fino a quando non viene annullata dal grano, quindi se un grano viene disattivato e riattivato, la sottoscrizione esplicita sarà ancora presente ma non sarà collegata alcuna logica di elaborazione. In questo caso, il grano deve ricollegare la logica di elaborazione. A tale scopo, in OnActivateAsync il grano deve prima determinare le sottoscrizioni presenti, chiamando IAsyncStream<T>.GetAllSubscriptionHandles(). Il grano deve eseguire ResumeAsync in ogni handle con cui desidera continuare l'elaborazione o UnsubscribeAsync negli handle non più necessari. Facoltativamente, il grano può specificare anche StreamSequenceToken come argomento della chiamata ResumeAsync affinché la sottoscrizione esplicita inizi l'utilizzo da tale token.

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
    var stream = streamProvider.GetStream<string>(streamId);

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    foreach (var handle in subscriptionHandles)
    {
       await handle.ResumeAsync(this);
    } 
}
public async override Task OnActivateAsync()
{
    var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
    var stream =
        streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");

    var subscriptionHandles = await stream.GetAllSubscriptionHandles();
    if (!subscriptionHandles.IsNullOrEmpty())
    {
        subscriptionHandles.ForEach(
            async x => await x.ResumeAsync(OnNextAsync));
    }
}

Ordine dei flussi e token di sequenza

L'ordine di recapito degli eventi tra un singolo producer e un singolo consumer dipende dal provider di flussi.

Con SMS il producer controlla in modo esplicito l'ordine degli eventi visualizzati dal consumer controllando il modo in cui vengono pubblicati. Per impostazione predefinita, se l'opzione SimpleMessageStreamProviderOptions.FireAndForgetDelivery per il provider SMS è impostata su false e se il producer attende ogni chiamata di OnNextAsync, gli eventi arrivano in base all'ordine FIFO. In SMS spetta al producer decidere come gestire gli errori di recapito che verranno indicati da un oggetto Task con errore restituito dalla chiamata di OnNextAsync.

I flussi di Coda di Azure non garantiscono l'ordine FIFO, perché l'istanza di Code di Azure sottostante non garantisce l'ordine nei casi di errore. L'ordine FIFO è invece garantito nelle esecuzioni senza errori. Quando un producer genera l'evento in Coda di Azure, se l'operazione di accodamento ha esito negativo, spetta al producer cercare di usare un'altra coda e successivamente gestire i potenziali messaggi duplicati. Sul lato del recapito, il runtime di flusso di Orleans rimuove l'evento dalla coda e cerca di recapitarlo per l'elaborazione ai consumer. Il runtime di flusso di Orleans elimina l'evento dalla coda solo quando l'elaborazione viene completata correttamente. Se il recapito o l'elaborazione non riesce, l'evento non viene eliminato dalla coda e riappare nella coda in un secondo momento. Il runtime di flusso tenterà di recapitarlo di nuovo, con la possibilità così che l'ordine FIFO venga interrotto. Il comportamento precedente corrisponde alla normale semantica di Code di Azure.

Ordine definito dall'applicazione: per risolvere i problemi di ordinamento illustrati in precedenza, un'applicazione può specificare l'ordine da usare. A tale scopo viene usato StreamSequenceToken, che è un oggetto IComparable opaco che può essere usato per ordinare gli eventi. Un producer può passare un oggetto StreamSequenceToken facoltativo alla chiamata di OnNext. Questo oggetto StreamSequenceToken verrà passato al consumer e verrà recapitato insieme all'evento. In questo modo, un'applicazione può ricostruire l'ordine indipendentemente dal runtime di flusso.

Flussi riavvolgibili

Alcuni flussi consentono a un'applicazione di effettuare la sottoscrizione solo a partire dall'ultimo punto temporale, mentre altri consentono di "tornare indietro nel tempo". Quest'ultima funzionalità dipende dalla tecnologia di accodamento sottostante e dal provider di flussi specifico. Ad esempio, il servizio Code di Azure consente di utilizzare solo gli eventi accodati più recenti, mentre Hub eventi consente la riproduzione di eventi da un punto arbitrario nel tempo (fino a un momento di scadenza). I flussi che consentono di tornare indietro nel tempo vengono detti flussi riavvolgibili.

Il consumer di un flusso riavvolgibile può passare un oggetto StreamSequenceToken alla chiamata di SubscribeAsync. Il runtime recapiterà gli eventi a partire da tale oggetto StreamSequenceToken. Un token Null indica che il consumer vuole ricevere gli eventi solo a partire dal più recente.

La possibilità di riavvolgere un flusso è molto utile negli scenari di ripristino. Si consideri, ad esempio, un grano che sottoscrive un flusso e ne controlla periodicamente lo stato insieme al token di sequenza più recente. Quando si esegue il ripristino da un errore, il grano può effettuare di nuovo la sottoscrizione dello stesso flusso dall'ultimo token di sequenza controllato, eseguendo così il ripristino senza perdere eventi generati dall'ultimo checkpoint.

Il provider di Hub eventi è riavvolgibile. Il relativo codice è disponibile in GitHub: Orleans/Azure/Orleans.Streaming.EventHubs. I provider SMS e di Coda di Azure non sono riavvolgibili.

Elaborazione senza stato con scalabilità orizzontale automatica

Per impostazione predefinita, il flusso di Orleans è pensato per supportare un numero elevato di flussi relativamente piccoli, ognuno elaborato da uno o più grani con stato. Collettivamente, l'elaborazione di tutti i flussi viene partizionata tra un numero elevato di grani regolari (con stato). Il codice dell'applicazione controlla il partizionamento assegnando ID di flusso e ID di grano ed effettuando la sottoscrizione in modo esplicito. L'obiettivo è l'elaborazione con stato partizionata.

C'è tuttavia anche un interessante scenario di elaborazione senza stato con scalabilità orizzontale automatica. In questo scenario, un'applicazione ha un numero ridotto di flussi (o anche un flusso di grandi dimensioni) e l'obiettivo è l'elaborazione senza stato. Un esempio è un flusso globale di eventi, in cui l'elaborazione comporta la decodifica di ogni evento e potenzialmente l'inoltro ad altri flussi per ulteriori operazioni di elaborazione con stato. L'elaborazione del flusso senza stato con scalabilità orizzontale può essere supportata in Orleans tramite grani StatelessWorkerAttribute.

Stato corrente dell'elaborazione automatica con scalabilità orizzontale senza stato: questa opzione non è ancora implementata. Un tentativo di sottoscrivere un flusso da un grano StatelessWorker causa un comportamento indefinito. L'aggiunta del supporto di questa opzione è prevista in futuro.

Grani e client Orleans

I flussi di Orleans funzionano in modo uniforme tra grani e client Orleans. Ovvero,è possibile usare le stesse API in un grano e in un client Orleans per produrre e utilizzare gli eventi. Ciò semplifica notevolmente la logica dell'applicazione, rendendo ridondanti le API lato client speciali, ad esempio gli observer dei grani.

Componente di pubblicazione/sottoscrizione completamente gestito e affidabile

Per tenere traccia delle sottoscrizioni dei flussi, Orleans usa un componente di runtime di pubblicazione/sottoscrizione dei flussi che funge da punto di incontro per i consumer di flussi e i producer di flussi. Il componente di pubblicazione/sottoscrizione tiene traccia di tutte le sottoscrizioni dei flussi rendendole permanenti e associa i consumer di flussi ai producer di flussi.

Le applicazioni possono scegliere dove e come vengono archiviati i dati di pubblicazione/sottoscrizione. Il componente di pubblicazione/sottoscrizione viene implementato sotto forma di grani (PubSubRendezvousGrain), che usano la persistenza dichiarativa di Orleans. PubSubRendezvousGrain usa il provider di archiviazione denominato PubSubStore. Come nel caso dei grani, è possibile designare un'implementazione per un provider di archiviazione. Per il componente di pubblicazione/sottoscrizione dei flussi è possibile modificare l'implementazione di PubSubStore al momento della creazione del silo usando il generatore di host del silo:

Il codice seguente configura il componente di pubblicazione/sottoscrizione per archiviare lo stato nelle tabelle di Azure.

hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
    options => options.ConnectionString = "<Secret>");

I dati di pubblicazione/sottoscrizione verranno così archiviati in modo permanente nella tabella di Azure. Per lo sviluppo iniziale, è anche possibile usare l'archiviazione in memoria. Oltre al componente di pubblicazione/sottoscrizione, il runtime di flusso di Orleans recapita gli eventi dai producer ai consumer, gestisce tutte le risorse di runtime allocate ai flussi usati attivamente ed esegue in modo trasparente attività di Garbage Collection delle risorse di runtime dai flussi inutilizzati.

Impostazione

Per usare i flussi è necessario abilitare i provider di flussi tramite l'host del silo o i generatori di client del cluster. Altre informazioni sui provider di flussi sono disponibili qui. Configurazione di esempio di un provider di flussi:

hostBuilder.AddMemoryStreams("StreamProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConfigureTableServiceClient("<Secret>")))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
    .AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
        optionsBuilder => optionsBuilder.Configure(
            options => options.ConnectionString = "<Secret>"))
    .AddAzureTableGrainStorage("PubSubStore",
        options => options.ConnectionString = "<Secret>");

Vedi anche

Provider di flussi di Orleans